Skip to content

Commit 5e76dfd

Browse files
author
torsstei
committed
Rebased datafusion impl from h2oai#240
1 parent 701e203 commit 5e76dfd

File tree

8 files changed

+502
-2
lines changed

8 files changed

+502
-2
lines changed

_control/solutions.csv

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@ duckdb,join
3232
modin,groupby
3333
modin,join
3434
modin,sort
35+
datafusion,groupby
36+
datafusion,join

_launcher/launcher.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ file.ext = function(x) {
1515
ans = switch(
1616
x,
1717
"data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R",
18-
"pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py",
18+
"pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "datafusion"="py",
1919
"clickhouse"="sql",
2020
"juliadf"="jl", "bodo"=
2121
)

_launcher/solution.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ file.ext = function(x) {
111111
ans = switch(
112112
x,
113113
"data.table"=, "dplyr"=, "h2o"=, "arrow"=, "duckdb"="R",
114-
"pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py",
114+
"pandas"=, "cudf"=, "spark"=, "pydatatable"=, "modin"=, "dask"=, "polars"="py", "datafusion"="py",
115115
"clickhouse"="sql",
116116
"juliadf"="jl", "bodo"="py"
117117
)

datafusion/groupby-datafusion.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
#!/usr/bin/env python
2+
3+
print("# groupby-datafusion.py", flush=True)
4+
5+
import os
6+
import gc
7+
import timeit
8+
import datafusion as df
9+
from datafusion import functions as f
10+
from datafusion import col
11+
from pyarrow import csv as pacsv
12+
from pyarrow import parquet as paparquet
13+
14+
exec(open("./_helpers/helpers.py").read())
15+
16+
def ans_shape(batches):
17+
rows, cols = 0, 0
18+
for batch in batches:
19+
rows += batch.num_rows
20+
if cols == 0:
21+
cols = batch.num_columns
22+
else:
23+
assert(cols == batch.num_columns)
24+
25+
return rows, cols
26+
27+
# ver = df.__version__
28+
ver = "6.0.0"
29+
git = ""
30+
task = "groupby"
31+
solution = "datafusion"
32+
fun = ".groupby"
33+
cache = "TRUE"
34+
on_disk = "FALSE"
35+
36+
37+
data_name = os.environ['SRC_DATANAME']
38+
data_format = os.environ['SRC_FORMAT']
39+
if(data_format.lower()=='parquet'):
40+
src_grp = os.path.join(os.getcwd(), "data", data_name+"_partitioned/")
41+
else:
42+
src_grp = os.path.join(os.getcwd(), "data", data_name+".csv")
43+
print("loading dataset %s" % src_grp, flush=True)
44+
45+
task_init = timeit.default_timer()
46+
if(data_format.lower()=='parquet'):
47+
data = paparquet.read_table(src_grp)
48+
else:
49+
data = pacsv.read_csv(src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True))
50+
print(f"done reading base dataframe in {timeit.default_timer() - task_init}")
51+
52+
ctx = df.ExecutionContext()
53+
ctx.register_record_batches("x", [data.to_batches()])
54+
55+
in_rows = data.num_rows
56+
print(in_rows, flush=True)
57+
58+
task_init = timeit.default_timer()
59+
print("grouping...", flush=True)
60+
61+
question = "sum v1 by id1" # q1
62+
gc.collect()
63+
print("\nRunning: " + question, flush=True)
64+
t_start = timeit.default_timer()
65+
ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect()
66+
shape = ans_shape(ans)
67+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
68+
t = timeit.default_timer() - t_start
69+
m = memory_usage()
70+
t_start = timeit.default_timer()
71+
df = ctx.create_dataframe([ans])
72+
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
73+
chkt = timeit.default_timer() - t_start
74+
print(f"Finished 1st run aggregation in {chkt}")
75+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
76+
del ans
77+
gc.collect()
78+
t_start = timeit.default_timer()
79+
ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect()
80+
shape = ans_shape(ans)
81+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
82+
t = timeit.default_timer() - t_start
83+
m = memory_usage()
84+
t_start = timeit.default_timer()
85+
df = ctx.create_dataframe([ans])
86+
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
87+
chkt = timeit.default_timer() - t_start
88+
print(f"Finished 2nd run aggregation in {chkt}")
89+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
90+
del ans
91+
92+
question = "sum v1 by id1:id2" # q2
93+
gc.collect()
94+
print("\nRunning: " + question, flush=True)
95+
t_start = timeit.default_timer()
96+
ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect()
97+
shape = ans_shape(ans)
98+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
99+
t = timeit.default_timer() - t_start
100+
m = memory_usage()
101+
t_start = timeit.default_timer()
102+
df = ctx.create_dataframe([ans])
103+
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
104+
chkt = timeit.default_timer() - t_start
105+
print(f"Finished 1st run aggregation in {chkt}")
106+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
107+
del ans
108+
gc.collect()
109+
t_start = timeit.default_timer()
110+
ans = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2").collect()
111+
shape = ans_shape(ans)
112+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
113+
t = timeit.default_timer() - t_start
114+
m = memory_usage()
115+
t_start = timeit.default_timer()
116+
df = ctx.create_dataframe([ans])
117+
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
118+
chkt = timeit.default_timer() - t_start
119+
print(f"Finished 2nd run aggregation in {chkt}")
120+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
121+
del ans
122+
123+
question = "sum v1 mean v3 by id3" # q3
124+
gc.collect()
125+
print("\nRunning: " + question, flush=True)
126+
t_start = timeit.default_timer()
127+
ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect()
128+
shape = ans_shape(ans)
129+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
130+
t = timeit.default_timer() - t_start
131+
m = memory_usage()
132+
t_start = timeit.default_timer()
133+
df = ctx.create_dataframe([ans])
134+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
135+
chkt = timeit.default_timer() - t_start
136+
print(f"Finished 1st run aggregation in {chkt}")
137+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
138+
del ans
139+
gc.collect()
140+
t_start = timeit.default_timer()
141+
ans = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3").collect()
142+
shape = ans_shape(ans)
143+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
144+
t = timeit.default_timer() - t_start
145+
m = memory_usage()
146+
t_start = timeit.default_timer()
147+
df = ctx.create_dataframe([ans])
148+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
149+
chkt = timeit.default_timer() - t_start
150+
print(f"Finished 2nd run aggregation in {chkt}")
151+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
152+
del ans
153+
154+
question = "mean v1:v3 by id4" # q4
155+
gc.collect()
156+
print("\nRunning: " + question, flush=True)
157+
t_start = timeit.default_timer()
158+
ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect()
159+
shape = ans_shape(ans)
160+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
161+
t = timeit.default_timer() - t_start
162+
m = memory_usage()
163+
t_start = timeit.default_timer()
164+
df = ctx.create_dataframe([ans])
165+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
166+
chkt = timeit.default_timer() - t_start
167+
print(f"Finished 1st run aggregation in {chkt}")
168+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
169+
del ans
170+
gc.collect()
171+
t_start = timeit.default_timer()
172+
ans = ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4").collect()
173+
shape = ans_shape(ans)
174+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
175+
t = timeit.default_timer() - t_start
176+
m = memory_usage()
177+
t_start = timeit.default_timer()
178+
df = ctx.create_dataframe([ans])
179+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
180+
chkt = timeit.default_timer() - t_start
181+
print(f"Finished 2nd run aggregation in {chkt}")
182+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
183+
del ans
184+
185+
question = "sum v1:v3 by id6" # q5
186+
gc.collect()
187+
print("\nRunning: " + question, flush=True)
188+
t_start = timeit.default_timer()
189+
ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect()
190+
shape = ans_shape(ans)
191+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
192+
t = timeit.default_timer() - t_start
193+
m = memory_usage()
194+
t_start = timeit.default_timer()
195+
df = ctx.create_dataframe([ans])
196+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
197+
chkt = timeit.default_timer() - t_start
198+
print(f"Finished 1st run aggregation in {chkt}")
199+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
200+
del ans
201+
gc.collect()
202+
t_start = timeit.default_timer()
203+
ans = ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6").collect()
204+
shape = ans_shape(ans)
205+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
206+
t = timeit.default_timer() - t_start
207+
m = memory_usage()
208+
t_start = timeit.default_timer()
209+
df = ctx.create_dataframe([ans])
210+
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2")), f.sum(col("v3"))]).collect()[0].to_pandas().to_numpy()[0]
211+
chkt = timeit.default_timer() - t_start
212+
print(f"Finished 2nd run aggregation in {chkt}")
213+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
214+
del ans
215+
216+
question = "max v1 - min v2 by id3" # q7
217+
gc.collect()
218+
print("\nRunning: " + question, flush=True)
219+
t_start = timeit.default_timer()
220+
ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect()
221+
shape = ans_shape(ans)
222+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
223+
t = timeit.default_timer() - t_start
224+
m = memory_usage()
225+
t_start = timeit.default_timer()
226+
df = ctx.create_dataframe([ans])
227+
chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0]
228+
chkt = timeit.default_timer() - t_start
229+
print(f"Finished 1st run aggregation in {chkt}")
230+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
231+
del ans
232+
gc.collect()
233+
t_start = timeit.default_timer()
234+
ans = ctx.sql("SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3").collect()
235+
shape = ans_shape(ans)
236+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
237+
t = timeit.default_timer() - t_start
238+
m = memory_usage()
239+
t_start = timeit.default_timer()
240+
df = ctx.create_dataframe([ans])
241+
chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0]
242+
chkt = timeit.default_timer() - t_start
243+
print(f"Finished 2nd run aggregation in {chkt}")
244+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
245+
del ans
246+
247+
question = "largest two v3 by id6" # q8
248+
gc.collect()
249+
print("\nRunning: " + question, flush=True)
250+
t_start = timeit.default_timer()
251+
ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect()
252+
shape = ans_shape(ans)
253+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
254+
t = timeit.default_timer() - t_start
255+
m = memory_usage()
256+
t_start = timeit.default_timer()
257+
df = ctx.create_dataframe([ans])
258+
chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0]
259+
chkt = timeit.default_timer() - t_start
260+
print(f"Finished 1st run aggregation in {chkt}")
261+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
262+
del ans
263+
gc.collect()
264+
t_start = timeit.default_timer()
265+
ans = ctx.sql("SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2").collect()
266+
shape = ans_shape(ans)
267+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
268+
t = timeit.default_timer() - t_start
269+
m = memory_usage()
270+
t_start = timeit.default_timer()
271+
df = ctx.create_dataframe([ans])
272+
chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0]
273+
chkt = timeit.default_timer() - t_start
274+
print(f"Finished 2nd run aggregation in {chkt}")
275+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
276+
del ans
277+
278+
question = "sum v3 count by id1:id6" # q10
279+
gc.collect()
280+
print("\nRunning: " + question, flush=True)
281+
t_start = timeit.default_timer()
282+
ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect()
283+
shape = ans_shape(ans)
284+
print(f"Finished 1st run grouping in {timeit.default_timer() - t_start}")
285+
t = timeit.default_timer() - t_start
286+
m = memory_usage()
287+
t_start = timeit.default_timer()
288+
df = ctx.create_dataframe([ans])
289+
chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0]
290+
chkt = timeit.default_timer() - t_start
291+
print(f"Finished 1st run aggregation in {chkt}")
292+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
293+
del ans
294+
gc.collect()
295+
t_start = timeit.default_timer()
296+
ans = ctx.sql("SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6").collect()
297+
shape = ans_shape(ans)
298+
print(f"Finished 2nd run grouping in {timeit.default_timer() - t_start}")
299+
t = timeit.default_timer() - t_start
300+
m = memory_usage()
301+
t_start = timeit.default_timer()
302+
df = ctx.create_dataframe([ans])
303+
chk = df.aggregate([], [f.sum(col("v3")), f.sum(col("cnt"))]).collect()[0].to_pandas().to_numpy()[0]
304+
chkt = timeit.default_timer() - t_start
305+
print(f"Finished 2nd run aggregation in {chkt}")
306+
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
307+
308+
print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True)
309+
310+
exit(0)

0 commit comments

Comments
 (0)