-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathbasic.py
More file actions
129 lines (105 loc) · 5 KB
/
basic.py
File metadata and controls
129 lines (105 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
Basic pipeline
==============
This example illustrates ploomber's core features with a typical data pipeline
scenario: dump data from a database and apply a transformation to it. We use
sqlite3 for this example but ploomber supports any database supported by
sqlalchemy without code changes.
"""
from pathlib import Path
import tempfile
import pandas as pd
from sqlalchemy import create_engine
from ploomber import DAG
from ploomber.products import File
from ploomber.tasks import PythonCallable, SQLDump
from ploomber.clients import SQLAlchemyClient
from ploomber.executors import Serial
###############################################################################
# This first part just exports some sample data to a database:
tmp_dir = Path(tempfile.mkdtemp())
uri = 'sqlite:///' + str(tmp_dir / 'example.db')
engine = create_engine(uri)
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
df.to_sql('example', engine)
###############################################################################
# There are three core concepts in ``ploomber``: :class:`Tasks <ploomber.tasks>`,
# :class:`Products <ploomber.products>` and :class:`DAGs <ploomber.DAG>`. Tasks
# are units of work that generate Products (which are persistent changes on
# disk). Tasks are organized into a DAG which keeps track of declared
# dependencies among them.
dag = DAG(executor=Serial(build_in_subprocess=False))
# the first task dumps data from the db to the local filesystem
task_dump = SQLDump('SELECT * FROM example',
File(tmp_dir / 'example.csv'),
dag,
name='dump',
client=SQLAlchemyClient(uri),
chunksize=None)
# since this task will have an upstream dependency, it has to accept the
# upstream parameter
def _add_one(upstream, product):
"""Add one to column a
"""
df = pd.read_csv(str(upstream['dump']))
df['a'] = df['a'] + 1
df.to_csv(str(product), index=False)
# we convert the Python function into a Task
task_add_one = PythonCallable(_add_one,
File(tmp_dir / 'add_one.csv'),
dag,
name='add_one')
# declare how tasks relate to each other: first dump then add one
task_dump >> task_add_one
# plot the workflow, pending tasks are shown in red
dag.plot()
# run our sample pipeline
dag.build()
###############################################################################
# Each time the DAG is run it will save the current timestamp and the
# source code of each task, next time we run it it will only run the
# necessary tasks to get everything up-to-date, there is a simple rule to
# that: a task will run if its code (or the code from any dependency) has
# changed since the last time it ran.
# Data processing pipelines consist on many small long-running tasks which
# depend on each other. During early development phases things are expected to
# change: new tasks are added, bugs are fixed. Triggering a full end-to-end
# run on each change is wasteful. On a successful run, ploomber saves the task
# source code, if the pipeline is run again, it will skip tasks that are not
# affected by the changes.
# the pipeline is up-to-date, no need to run again
dag.build()
###############################################################################
# Inspecting a pipeline
# *********************
# A lot of data pipelines start as experimental projects (e.g. developing a
# Machine Learning model), which causes them to grow unpredictably. As the
# pipeline evolves, it will span dozens of files whose intent is unclear. The
# DAG object serves as the primary reference for anyone seeking to understand
# the pipeline.
# Making a pipeline transparent helps others quickly understand it without
# going through the code details and eases debugging for developers.
# status returns a summary of each task status
dag.status()
###############################################################################
# Inspecting the `DAG` object
# ---------------------------
# A lot of data work is done interactively using Jupyter or similar tools,
# being able interact with a pipeline in the same way is an effective way of
# experimenting new methods.
# say you are adding a new method to task add_one, you can run your code
# with all upstream dependencies being taken care of like this
# run your task
dag['add_one'].build(force=True)
###############################################################################
# avoid hardcoding paths to files by loading them directly
# from the DAG, casting a Task to a str, will cause them
# to return a valid representation, in this case, our
# product is a File, so it will return a path to it, for SQL relations
# str(product) will return "schema"."name". Using this method for loading
# products makes sure you don't have to hardcode paths to files and that
# given your pipeline definition, you always read from the right place
# explore results - reading the file this way guarantees you are using
# the right file
df = pd.read_csv(str(dag['add_one']))
df