-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathparametrizing.py
More file actions
108 lines (85 loc) · 3.37 KB
/
parametrizing.py
File metadata and controls
108 lines (85 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
Parametrized DAGs
=================
This example shows how to parametrize tasks that use the same source code.
"""
from pathlib import Path
import tempfile
import pandas as pd
from ploomber import DAG
from ploomber.tasks import PythonCallable, SQLUpload, SQLScript
from ploomber.products import File, SQLiteRelation
from ploomber.clients import SQLAlchemyClient
from ploomber.executors import Serial
tmp_dir = Path(tempfile.mkdtemp())
path_to_db = 'sqlite:///' + str(tmp_dir / 'my_db.db')
print('temporary dir: ', tmp_dir)
dag = DAG(executor=Serial(build_in_subprocess=False))
client = SQLAlchemyClient(path_to_db)
dag.clients[SQLUpload] = client
dag.clients[SQLiteRelation] = client
dag.clients[SQLScript] = client
def get_data(product, filename):
"""Get red wine data
"""
url = ('http://archive.ics.uci.edu/ml/machine-learning-databases/'
'wine-quality/' + filename)
df = pd.read_csv(url, sep=';', index_col=False)
df.to_parquet(str(product))
def concat_data(upstream, product):
"""Concatenate red and white wine data
"""
red = pd.read_parquet(str(upstream['red']))
red['kind'] = 'red'
white = pd.read_parquet(str(upstream['white']))
white['kind'] = 'white'
df = pd.concat([red, white])
df.to_parquet(str(product))
###############################################################################
# in both red_task and white_task, we use the same function get_data,
# but pass different parameters
red_task = PythonCallable(get_data,
product=File(tmp_dir / 'red.parquet'),
dag=dag,
name='red',
params={'filename': 'winequality-red.csv'})
white_task = PythonCallable(get_data,
product=File(tmp_dir / 'white.parquet'),
dag=dag,
name='white',
params={'filename': 'winequality-white.csv'})
concat_task = PythonCallable(concat_data,
product=File(tmp_dir / 'all.parquet'),
dag=dag,
name='all')
upload_task = SQLUpload(tmp_dir / 'all.parquet',
product=SQLiteRelation((None, 'data', 'table')),
dag=dag,
name='upload')
###############################################################################
# you can use jinja2 to parametrize SQL, {{upstream}} and {{product}}
# are available for your script. this way you could switch products without
# changing your source code (e.g. each Data Scientist in your team writes
# to his/her own db schema to have isolated runs)
sql = """
CREATE TABLE {{product}} AS
SELECT *,
pH > AVG(pH) AS high_pH
FROM {{upstream['upload']}}
"""
features = SQLScript(sql,
product=SQLiteRelation((None, 'features', 'table')),
dag=dag,
name='features')
red_task >> concat_task
white_task >> concat_task
concat_task >> upload_task >> features
###############################################################################
# render will pass all parameters so you can see exactly which SQL code
# will be executed
dag.render()
###############################################################################
# print source code for task "features"
print(dag['features'].source)
# dag.plot()
dag.build()