-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathloader.py
More file actions
94 lines (69 loc) · 2.98 KB
/
loader.py
File metadata and controls
94 lines (69 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
Managing static files (SQL scripts, jupyter notebooks)
======================================================
How to manage non-Python source files using SourceLoader, this examples shows
how to load parametrized SQL scripts and make use of advanced jinja2 features
"""
###############################################################################
# Data pipelines code usually includes non-Python files (e.g. SQL scripts),
# SourceLoader provides a convenient way of loading them to avoid hardcoding
# paths to files and enables the use of advanced jinja2 features such as
# macros.
from pathlib import Path
import tempfile
import pandas as pd
from ploomber.clients import SQLAlchemyClient
from ploomber.tasks import SQLTransfer, SQLScript
from ploomber.products import SQLiteRelation
from ploomber.executors import Serial
from ploomber import DAG, SourceLoader
###############################################################################
# We first setup our sample environment, a sqlite database with some data
tmp_dir = Path(tempfile.mkdtemp())
client = SQLAlchemyClient('sqlite:///' + str(tmp_dir / 'my_db.db'))
df = pd.DataFrame({'x': range(10)})
df.to_sql('data', client.engine)
###############################################################################
# We now simulate our code environment: a folder with SQL scripts, for
# simplicity, we are saving them in the same location as the data but in a real
# project we should keep and data separate
_ = Path(tmp_dir, 'data_select.sql').write_text('SELECT * FROM data')
###############################################################################
# Unde the hood SourceLoader initializes a jinja2.Environment which allows us
# to use features such as macros
Path(tmp_dir, 'macros.sql').write_text("""
{% macro my_macro() -%}
-- This is a macro
{%- endmacro %}
""")
_ = (Path(tmp_dir, 'subset_create.sql').write_text("""
{% from 'macros.sql' import my_macro %}
{{my_macro()}}
CREATE TABLE {{product}} AS
SELECT * FROM
{{upstream["transfer"]}} WHERE x > 1
"""))
###############################################################################
# DAG declaration
dag = DAG(executor=Serial(build_in_subprocess=False))
dag.clients[SQLTransfer] = client
dag.clients[SQLiteRelation] = client
dag.clients[SQLScript] = client
source_loader = SourceLoader(tmp_dir)
transfer = SQLTransfer(source_loader['data_select.sql'],
product=SQLiteRelation((None, 'data2', 'table')),
dag=dag,
name='transfer')
subset = SQLScript(source_loader['subset_create.sql'],
product=SQLiteRelation((None, 'subset', 'table')),
dag=dag,
name='subset')
transfer >> subset
dag.render()
###############################################################################
# Our macro is correctly rendered:
print(dag['subset'].source)
###############################################################################
# Plot and execute pipeline:
dag.plot()
dag.build()