-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathparam_grid.py
More file actions
57 lines (43 loc) · 1.59 KB
/
param_grid.py
File metadata and controls
57 lines (43 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from datetime import date
import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta
from ploomber import DAG
from ploomber.executors import Serial
from ploomber.tasks import PythonCallable, TaskGroup
from ploomber.products import File
from ploomber.util import ParamGrid, Interval
def get_data(product, dates):
"""
Dummy code, in reality this would usually be a Task that pulls data
from a database
"""
dates_series = pd.date_range(start=dates[0],
end=dates[1],
inclusive='left',
freq='D')
values = np.random.rand(dates_series.shape[0])
df = pd.DataFrame({'dates': dates_series, 'values': values})
df.to_parquet(str(product))
dag = DAG()
# NOTE: this is only required for testing purpose
dag.executor = Serial(build_in_subprocess=False)
start_date = date(year=2010, month=1, day=1)
end_date = date(year=2019, month=6, day=1)
delta = relativedelta(years=1)
params_array = ParamGrid({
'dates': Interval(start_date, end_date, delta)
}).zip()
def namer(params):
s = str(params['dates'][0]).replace('-', '_')
e = str(params['dates'][1]).replace('-', '_')
return 'get_data_{}_{}'.format(s, e)
TaskGroup.from_params(task_class=PythonCallable,
product_class=File,
product_primitive='products/{{name}}.parquet',
task_kwargs={'source': get_data},
dag=dag,
params_array=params_array,
namer=namer)
dag.plot()
dag.build()