-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpython_functions.py
More file actions
166 lines (138 loc) · 4.91 KB
/
python_functions.py
File metadata and controls
166 lines (138 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import sys
import requests
import numpy as np
import pandas as pd
import scipy.stats as spstats
from tqdm.notebook import tqdm
from joblib import Parallel, delayed
def check_dir(dir: str):
"""
Creates a given path driectory "dir" if it does not exist.
Args:
dir (str): Path to the directory.
"""
if os.path.exists(dir) and os.path.isdir(dir):
pass
else:
os.makedirs(dir)
def string_protein_search(string_ids: list, hgnc_table: pd.DataFrame):
"""
Retrieve gene symbols for a list of STRING protein IDs.
This function takes a list of STRING protein IDs and a pandas DataFrame containing HGNC gene data.
It then uses the Ensembl REST API to look up the corresponding transcript and gene IDs for each protein ID.
Finally, it merges the gene information with the HGNC data to return a DataFrame containing the original
STRING protein IDs and their corresponding gene symbols.
Parameters:
string_ids (list): A list of STRING protein IDs.
hgnc_table (pd.DataFrame): A pandas DataFrame containing HGNC gene data, with columns 'ensembl_gene_id' and 'symbol'.
Returns:
pd.DataFrame: A pandas DataFrame with columns 'string_ids' and 'symbol', containing the original STRING protein IDs and their corresponding gene symbols.
"""
proteins = {prot: prot.split(".")[1] for prot in string_ids}
server = "https://rest.ensembl.org"
ext = "/lookup/id"
headers = {"Content-Type": "application/json", "Accept": "application/json"}
# The first query returned transcript symbols
r = requests.post(
server + ext, headers=headers, json={"ids": list(proteins.values())}
)
if not r.ok:
r.raise_for_status()
sys.exit()
decoded = r.json()
transcripts = {}
for k, v in proteins.items():
if decoded[v] is not None:
transcripts[k] = decoded[v]["Parent"]
# The second query will return gene symbols
r = requests.post(
server + ext, headers=headers, json={"ids": list(transcripts.values())}
)
if not r.ok:
r.raise_for_status()
sys.exit()
decoded = r.json()
genes = []
for k, v in transcripts.items():
if decoded[v] is not None:
genes.append((k, decoded[v]["Parent"]))
genes = pd.DataFrame(genes, columns=pd.Index(["string_ids", "ensembl_gene_id"]))
genes = pd.merge(genes, hgnc_table)[["string_ids", "symbol"]]
return genes
def spearman_corr(expression, mutation_burden):
rho, pval = spstats.spearmanr(
expression,
mutation_burden,
)
return pd.Series(
{
"rho": rho,
"pval": pval,
}
)
def driver_neighbour_corr(
driver: int,
neighbourlist: np.ndarray,
mutationtab: np.ndarray,
expressiontab: np.ndarray,
ctfilter: np.ndarray | None = None,
) -> tuple:
if ctfilter is not None:
filt = ctfilter[driver]
else:
filt = np.array([True] * len(mutationtab))
exp = expressiontab[np.ix_(filt, neighbourlist)]
mask = np.flatnonzero(exp.sum(axis=0) > 0)
if len(mask) == 0:
# if driver has no expression in any neighbour
return [np.nan], [np.nan], [np.nan]
elif len(mask) == 1:
# if driver has only one neighbour with expression
rho, pvalue = spstats.spearmanr(exp[:, mask], mutationtab[filt, driver])
return [rho], [pvalue], neighbourlist[mask]
else:
rho, pvalue = spstats.spearmanr(exp[:, mask], mutationtab[filt, driver])
return rho[-1, :-1], pvalue[-1, :-1], neighbourlist[mask]
def between_cancer_corr(
exparray: np.ndarray,
mutationarray: np.ndarray,
graph: pd.DataFrame,
cancertype_filter: np.ndarray | None = None,
n_jobs: int = -1,
progressbar: bool = True,
):
driverlist = graph.columns.tolist()
if progressbar:
neighbourlist = tqdm([np.flatnonzero(graph[driver]) for driver in driverlist])
else:
neighbourlist = [np.flatnonzero(graph[driver]) for driver in driverlist]
corr_results = Parallel(n_jobs=n_jobs)(
delayed(driver_neighbour_corr)(
driver, neighbours, mutationarray, exparray, cancertype_filter
)
for driver, neighbours in enumerate(neighbourlist)
)
results = {
"driver": [],
"neighbour": [],
"rho": [],
"rho_pval": [],
}
for driver in range(len(driverlist)):
results["driver"].extend([driverlist[driver]] * len(corr_results[driver][2]))
results["neighbour"].extend(corr_results[driver][2])
results["rho"].extend(corr_results[driver][0])
results["rho_pval"].extend(corr_results[driver][1])
results = (
pd.DataFrame(results)
.set_index("neighbour")
.merge(
pd.Series(graph.index).rename("neighbour"),
left_index=True,
right_index=True,
)
.dropna()
.reset_index(drop=True)
)
return results