Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/regression.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion]
solution: [data.table, collapse, dplyr, pandas, pydatatable, spark, juliadf, juliads, polars, arrow, duckdb, duckdb-latest, datafusion, dask]
name: Regression Tests solo solutions
runs-on: ubuntu-20.04
env:
Expand Down Expand Up @@ -46,6 +46,10 @@ jobs:
shell: bash
run: rm time.csv logs.csv

- name: Update virtualenv
shell: bash
run: python3 -m pip install virtualenv

- name: Install all solutions
shell: bash
run: source path.env && python3 _utils/install_all_solutions.py ${{ matrix.solution }}
Expand Down Expand Up @@ -167,4 +171,3 @@ jobs:
name: all-out.zip
path: all-out.zip
if-no-files-found: error

2 changes: 1 addition & 1 deletion _utils/prep_solutions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SOLUTIONS_FILENAME = "_control/solutions.csv"
RUN_CONF_FILENAME = "run.conf"

SKIPPED_SOLUTIONS = ["clickhouse", "dask"]
SKIPPED_SOLUTIONS = ["clickhouse"]


def print_usage():
Expand Down
303 changes: 162 additions & 141 deletions dask/groupby-dask2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from dask import distributed
# we use process-pool instead of thread-pool due to GIL cost
client = distributed.Client(processes=True, silence_logs=logging.ERROR)
# since we are running on local cluster of processes, we would prefer to keep the communication between workers to relative minimum, thus it's better to trade some tasks granularity for better processing locality
dk.config.set({"optimization.fuse.ave-width": 20})

data_name = os.environ['SRC_DATANAME']
on_disk = False #data_name.split("_")[1] == "1e9" # on-disk data storage #126
Expand All @@ -38,8 +36,12 @@
exit(0) # not yet implemented #171, currently groupby's dropna=False argument is ignored

print("using disk memory-mapped data storage" if on_disk else "using in-memory data storage", flush=True)
#x = dd.read_parquet(src_grp, engine="fastparquet") if on_disk else
x = dd.read_csv(src_grp, dtype={"id1":"category","id2":"category","id3":"category","id4":"Int32","id5":"Int32","id6":"Int32","v1":"Int32","v2":"Int32","v3":"float64"})
#x = dd.read_parquet(src_grp, engine="pyarrow") if on_disk else
x = dd.read_csv(
src_grp,
dtype={"id1":"category","id2":"category","id3":"category","id4":"Int32","id5":"Int32","id6":"Int32","v1":"Int32","v2":"Int32","v3":"float64"},
engine="pyarrow"
)

x = x.persist()

Expand Down Expand Up @@ -189,147 +191,166 @@
print(ans.tail(3), flush=True)
del ans

#question = "median v3 sd v3 by id4 id5" # q6 # median function not yet implemented: https://github.com/dask/dask/issues/4362
#gc.collect()
#t_start = timeit.default_timer()
#ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#print(ans.head(3), flush=True)
#print(ans.tail(3), flush=True)
#del ans
question = "median v3 sd v3 by id4 id5" # q6
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}, shuffle='p2p').compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id4','id5'], dropna=False, observed=True).agg({'v3': ['median','std']}, shuffle='p2p').compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v3']['median'].sum(), ans['v3']['std'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

# question = "max v1 - min v2 by id3" # q7
# gc.collect()
# t_start = timeit.default_timer()
# ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute()
# ans.reset_index(inplace=True)
# print(ans.shape, flush=True)
# t = timeit.default_timer() - t_start
# m = memory_usage()
# t_start = timeit.default_timer()
# chk = [ans['range_v1_v2'].sum()]
# chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
# del ans
# gc.collect()
# t_start = timeit.default_timer()
# ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute()
# ans.reset_index(inplace=True)
# print(ans.shape, flush=True)
# t = timeit.default_timer() - t_start
# m = memory_usage()
# t_start = timeit.default_timer()
# chk = [ans['range_v1_v2'].sum()]
# chkt = timeit.default_timer() - t_start
# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
# del ans
question = "max v1 - min v2 by id3" # q7
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['range_v1_v2'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby('id3', dropna=False, observed=True).agg({'v1':'max', 'v2':'min'}).assign(range_v1_v2=lambda x: x['v1']-x['v2'])[['range_v1_v2']].compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['range_v1_v2'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

#question = "largest two v3 by id6" # q8
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute()
#ans.reset_index(level='id6', inplace=True)
#ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['v3'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute()
#ans.reset_index(level='id6', inplace=True)
#ans.reset_index(drop=True, inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['v3'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#print(ans.head(3), flush=True)
#print(ans.tail(3), flush=True)
#del ans
question = "largest two v3 by id6" # q8
gc.collect()
t_start = timeit.default_timer()
ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute()
ans.reset_index(level='id6', inplace=True)
ans.reset_index(drop=True, inplace=True) # drop because nlargest creates some extra new index field
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v3'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[~x['v3'].isna()][['id6','v3']].groupby('id6', dropna=False, observed=True).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6':'Int64', 'v3':'float64'})[['v3']].compute()
ans.reset_index(level='id6', inplace=True)
ans.reset_index(drop=True, inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v3'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

#question = "regression v1 v2 by id2 id4" # q9
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['r2'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans['r2'].sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#print(ans.head(3), flush=True)
#print(ans.tail(3), flush=True)
#del ans
question = "regression v1 v2 by id2 id4" # q9
gc.collect()
t_start = timeit.default_timer()
ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['r2'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[['id2','id4','v1','v2']].groupby(['id2','id4'], dropna=False, observed=True).apply(lambda x: pd.Series({'r2': x.corr()['v1']['v2']**2}), meta={'r2':'float64'}).compute()
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['r2'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

#question = "sum v3 count by id1:id6" # q10
#gc.collect()
#t_start = timeit.default_timer()
#ans = x.groupby(['id1','id2','id3','id4','id5','id6'], dropna=False, observed=True).agg({'v3':'sum', 'v1':'size'}).compute() # column name different than expected, ignore it because: ValueError: Metadata inference failed in `rename`: Original error is below: ValueError('Level values must be unique: [nan, nan] on level 0',)
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans.v3.sum(), ans.v1.sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = x.groupby(['id1','id2','id3','id4','id5','id6'], dropna=False, observed=True).agg({'v3':'sum', 'v1':'size'}).compute()
#ans.reset_index(inplace=True)
#print(ans.shape, flush=True)
#t = timeit.default_timer() - t_start
#m = memory_usage()
#t_start = timeit.default_timer()
#chk = [ans.v3.sum(), ans.v1.sum()]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#print(ans.head(3), flush=True)
#print(ans.tail(3), flush=True)
#del ans
question = "sum v3 count by id1:id6" # q10
print(question)
gc.collect()
t_start = timeit.default_timer()
ans = (
x.groupby(
['id1', 'id2', 'id3', 'id4', 'id5', 'id6'],
dropna=False,
observed=True,
)
.agg({'v3': 'sum', 'v1': 'size'}, split_out=x.npartitions)
.rename(columns={"v1": "count"})
.compute()
)
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans.v3.sum(), ans["count"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = (
x.groupby(
['id1', 'id2', 'id3', 'id4', 'id5', 'id6'],
dropna=False,
observed=True,
)
.agg({'v3': 'sum', 'v1': 'size'}, split_out=x.npartitions)
.rename(columns={"v1": "count"})
.compute()
)
ans.reset_index(inplace=True)
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans.v3.sum(), ans["count"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

print("grouping finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True)

Expand Down
4 changes: 1 addition & 3 deletions dask/setup-dask.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#!/bin/bash
set -e

virtualenv dask/py-dask --python=python3
virtualenv dask/py-dask --python=python3.10
source dask/py-dask/bin/activate

# install binaries
python3 -m pip install "dask[complete]"
python3 -m pip install pandas psutil
python3 -m pip install distributed

# check
# python3
Expand Down