forked from mmarifat/dmd_postgres_parser
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
162 lines (133 loc) · 5.62 KB
/
main.py
File metadata and controls
162 lines (133 loc) · 5.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import glob
import io
import os
import shutil
import time
import xml.etree.ElementTree as ET
import zipfile
import functions_framework
from loguru import logger
from services.cors import cors_enabled_header
from services.env import Env
from services.httpadapters import get_request_obj
from services.postgres import postgres_cnx
from services.trud_s3 import get_trud_s3_object
dmd_structure_path = os.path.dirname(os.path.realpath(__file__)) + "/dmd_structure.sql"
def monotabular(cursor, path, concept_name):
tree = ET.parse(path)
root = tree.getroot()
section = root
table_name = str(section.tag).lower()
logger.info(f"Populating table => {concept_name}_{table_name}")
for entry in section:
columns = []
values = []
for field in entry:
columns.append(field.tag)
values.append(field.text)
clmn = ", ".join(['"{}"'.format(value) for value in columns])
sql = 'INSERT INTO "{}" ({}) VALUES ({})'.format(
concept_name + "_" + table_name, clmn, ",".join(["%s"] * len(columns))
)
cursor.execute(sql, values)
def polytabular(cursor, path, concept_name):
tree = ET.parse(path)
root = tree.getroot()
for section in root:
table_name = str(section.tag).lower()
logger.info(f"Populating table => {table_name}")
for entry in section:
columns = []
values = []
for field in entry:
columns.append(field.tag)
values.append(field.text)
clmn = ", ".join(['"{}"'.format(value) for value in columns])
sql = 'INSERT INTO "{}" ({}) VALUES ({})'.format(
concept_name + "_" + table_name, clmn, ",".join(["%s"] * len(columns))
)
cursor.execute(sql, values)
concepts = [
("f_lookup2_3{}.xml", "lookup", polytabular),
("f_ingredient2_3{}.xml", "ingredient", monotabular),
("f_vtm2_3{}.xml", "vtm", monotabular),
("f_vmp2_3{}.xml", "vmp", polytabular),
("f_amp2_3{}.xml", "amp", polytabular),
("f_vmpp2_3{}.xml", "vmpp", polytabular),
("f_ampp2_3{}.xml", "ampp", polytabular),
]
def run(trud_s3_key: str):
try:
start_time = time.time()
schema_name = str.format(
trud_s3_key.split("/").pop().split(".zip").pop(0).replace(".", "_")
)
logger.info(f"Processing => {schema_name}")
if not schema_name:
logger.info(f"Skipping as no schema found.")
return "No schema found.", 400
else:
trud_s3_zip_content = get_trud_s3_object(trud_s3_key)
if not trud_s3_zip_content:
logger.info(f"Skipping as no data found in storage => {schema_name}")
return f"No trud data found for schema {schema_name}", 400
else:
logger.info(f"Extracting zip content => {schema_name}")
zip_extract_dir = f"./extracts/{schema_name}"
with zipfile.ZipFile(io.BytesIO(trud_s3_zip_content), "r") as zip_ref:
zip_ref.extractall(zip_extract_dir)
logger.info(f"Connecting to database...")
cnx = postgres_cnx()
cursor = cnx.cursor()
logger.info(f"Creating schema => {schema_name}")
cursor.execute(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}";')
cursor.execute(f'SET search_path TO "{schema_name}";')
logger.info("Importing table structure...")
with open(dmd_structure_path, "r") as f:
cursor.execute(f.read())
logger.info("Parsing xml...")
os.chdir(zip_extract_dir)
for con in concepts:
filename = glob.glob(con[0].format("*"))[0]
logger.info(f"Parsing concept => {con[1]}")
con[2](cursor, filename, con[1])
logger.info(
"Appending nhs queue list into 'medicine.nhs_medicine_api_data_process' table..."
)
insert_command = "INSERT INTO medicine.nhs_medicine_api_data_process (schema_name, status) VALUES (%s, %s);"
cursor.execute(insert_command, (schema_name, 1))
cnx.commit()
os.chdir("../../")
shutil.rmtree(zip_extract_dir)
cursor.close()
cnx.close()
elapsed_time = time.time() - start_time
logger.info(f"Finished processing => {schema_name}")
logger.info(f"Total time taken: {elapsed_time} seconds")
return f"Schema: {schema_name}; Took {elapsed_time} seconds", 200
except Exception as e:
logger.exception(f"[DMD PARSER ERROR] {e}")
return f"Error: {str(e)}", 500
@functions_framework.http
def dmd_postgres_parser(request):
try:
headers = cors_enabled_header(request)
if request.method == "OPTIONS":
return "", 204, headers
if request.method != "POST":
return "Method not allowed", 405, headers
obj = get_request_obj(request)
logger.info(f"[DMD PARSER REQUEST BODY] {obj}")
if not obj.get("key"):
return "No key provided in body", 400, headers
if not obj.get("api_key") or obj.get("api_key") != Env.DMD_X_API_KEY:
return "Invalid API KEY", 403, headers
message, code = run(obj.get("key"))
return message, code, headers
except Exception as e:
logger.exception(f"[DMD PARSER ERROR] {e}")
return (
f"Internal Server Error{str(e)}",
500,
cors_enabled_header(request),
)