Skip to content

Commit 31ed6ba

Browse files
implement tasks API and adapt notification tests
1 parent d9dcad8 commit 31ed6ba

File tree

6 files changed

+95
-24
lines changed

6 files changed

+95
-24
lines changed

bioblend/_tests/GalaxyTestBase.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
@test_util.skip_unless_galaxy()
1919
class GalaxyTestBase(unittest.TestCase):
2020
gi: GalaxyInstance
21+
config: Dict[str, Any]
2122

2223
@classmethod
2324
def setUpClass(cls):
2425
galaxy_key = os.environ["BIOBLEND_GALAXY_API_KEY"]
2526
galaxy_url = os.environ["BIOBLEND_GALAXY_URL"]
2627
cls.gi = GalaxyInstance(url=galaxy_url, key=galaxy_key)
28+
cls.config = cls.gi.config.get_config()
2729

2830
def _test_dataset(self, history_id: str, contents: str = "1\t2\t3", **kwargs: Any) -> str:
2931
tool_output = self.gi.tools.paste_content(contents, history_id, **kwargs)

bioblend/_tests/TestGalaxyNotifications.py

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
Optional,
1010
)
1111

12+
from galaxy.tool_util.verify.wait import wait_on
13+
1214
from bioblend.galaxy import GalaxyInstance
1315
from . import (
1416
GalaxyTestBase,
@@ -32,6 +34,8 @@ def test_notification_status(self):
3234
if not self.gi.users.get_current_user()["is_admin"]:
3335
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
3436

37+
task_based = self.gi.config.get_config()["enable_celery_tasks"]
38+
3539
# user creation for the test
3640
user1 = self._create_local_test_user(password="password")
3741
user2 = self._create_local_test_user(password="password")
@@ -56,14 +60,14 @@ def test_notification_status(self):
5660
[user1["id"]],
5761
message="test_notification_status 1",
5862
)
59-
assert created_response_1["total_notifications_sent"] == 1
63+
self._assert_notifications_sent(task_based, created_response_1, 1)
6064

6165
# Both user1 and user2 will receive this notification
6266
created_response_2 = self._send_test_notification_to(
6367
[user1["id"], user2["id"]],
6468
message="test_notification_status 2",
6569
)
66-
assert created_response_2["total_notifications_sent"] == 2
70+
self._assert_notifications_sent(task_based, created_response_2, 2)
6771

6872
# All users will receive this broadcasted notification
6973
self._send_test_broadcast_notification(message="test_notification_status 3")
@@ -146,11 +150,16 @@ def test_get_user_notifications(self):
146150
if not self.gi.users.get_current_user()["is_admin"]:
147151
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
148152

153+
task_based = self.gi.config.get_config()["enable_celery_tasks"]
154+
149155
# send the notifications
150156
user_id = [self.gi.users.get_current_user()["id"]]
151-
self._send_test_notification_to(user_id, message="test_notification_status 1")
152-
self._send_test_notification_to(user_id, message="test_notification_status 2")
153-
self._send_test_notification_to(user_id, message="test_notification_status 3")
157+
send_response = self._send_test_notification_to(user_id, message="test_notification_status 1")
158+
self._assert_notifications_sent(task_based, send_response, 1)
159+
send_response = self._send_test_notification_to(user_id, message="test_notification_status 2")
160+
self._assert_notifications_sent(task_based, send_response, 1)
161+
send_response = self._send_test_notification_to(user_id, message="test_notification_status 3")
162+
self._assert_notifications_sent(task_based, send_response, 1)
154163

155164
# this should fetch all the notifications
156165
created_response_1 = self.gi.notifications.get_user_notifications()
@@ -239,6 +248,7 @@ def test_show_notification(self):
239248
self.skipTest("This Galaxy instance is not configured to use notifications.")
240249
if not self.gi.users.get_current_user()["is_admin"]:
241250
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
251+
task_based = self.gi.config.get_config()["enable_celery_tasks"]
242252

243253
# user creation for the test
244254
user = self._create_local_test_user(password="password")
@@ -247,10 +257,11 @@ def test_show_notification(self):
247257
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")
248258

249259
# send the test notification
250-
notification_response = self._send_test_notification_to([user["id"]], message="test_notification_status")[
251-
"notification"
252-
]
253-
notification_id = notification_response["id"]
260+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status")
261+
self._assert_notifications_sent(task_based, send_response, 1)
262+
263+
notifications = user_gi.notifications.get_user_notifications()
264+
notification_id = notifications[0]["id"]
254265

255266
# Fetch the notification
256267
notification = user_gi.notifications.show_notification(notification_id)
@@ -269,6 +280,7 @@ def test_update_notifications(self):
269280
self.skipTest("This Galaxy instance is not configured to use notifications.")
270281
if not self.gi.users.get_current_user()["is_admin"]:
271282
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
283+
task_based = self.gi.config.get_config()["enable_celery_tasks"]
272284

273285
# user creation for the test
274286
user = self._create_local_test_user(password="password")
@@ -277,12 +289,14 @@ def test_update_notifications(self):
277289
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")
278290

279291
# send the test notifications and save their ids
280-
notification_1_id = self._send_test_notification_to([user["id"]], message="test_notification_status 1")[
281-
"notification"
282-
]["id"]
283-
notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[
284-
"notification"
285-
]["id"]
292+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
293+
self._assert_notifications_sent(task_based, send_response, 1)
294+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2")
295+
self._assert_notifications_sent(task_based, send_response, 1)
296+
297+
notifications = user_gi.notifications.get_user_notifications()
298+
notification_1_id = notifications[0]["id"]
299+
notification_2_id = notifications[1]["id"]
286300

287301
# fetch the notifications
288302
notification_1 = user_gi.notifications.show_notification(notification_1_id)
@@ -349,6 +363,7 @@ def test_delete_notifications(self):
349363
self.skipTest("This Galaxy instance is not configured to use notifications.")
350364
if not self.gi.users.get_current_user()["is_admin"]:
351365
self.skipTest("This tests requires the current user to be an admin, which is not the case.")
366+
task_based = self.gi.config.get_config()["enable_celery_tasks"]
352367

353368
# user creation for the test
354369
user = self._create_local_test_user(password="password")
@@ -357,15 +372,17 @@ def test_delete_notifications(self):
357372
user_gi = GalaxyInstance(url=self.gi.base_url, email=user["email"], password="password")
358373

359374
# send the test notifications and save their ids
360-
notification_1 = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
361-
print(notification_1)
362-
notification_1_id = notification_1["notification"]["id"]
363-
notification_2_id = self._send_test_notification_to([user["id"]], message="test_notification_status 2")[
364-
"notification"
365-
]["id"]
366-
notification_3_id = self._send_test_notification_to([user["id"]], message="test_notification_status 3")[
367-
"notification"
368-
]["id"]
375+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 1")
376+
self._assert_notifications_sent(task_based, send_response, 1)
377+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 2")
378+
self._assert_notifications_sent(task_based, send_response, 1)
379+
send_response = self._send_test_notification_to([user["id"]], message="test_notification_status 3")
380+
self._assert_notifications_sent(task_based, send_response, 1)
381+
382+
notifications = user_gi.notifications.get_user_notifications()
383+
notification_1_id = notifications[0]["id"]
384+
notification_2_id = notifications[1]["id"]
385+
notification_3_id = notifications[2]["id"]
369386

370387
# delete a single notifications
371388
response_1 = user_gi.notifications.delete_user_notification(notification_id=notification_1_id)
@@ -545,4 +562,16 @@ def _send_test_notification_to(
545562
user_ids=user_ids,
546563
expiration_time=(datetime.utcnow() + timedelta(days=1)),
547564
)
565+
print(f"_send_test_notification_to {notification=}")
548566
return notification
567+
568+
def _assert_notifications_sent(self, task_based, response, expected_count: int = 0):
569+
def task_success():
570+
return None if self.gi.tasks.get_task_status(task_id) != "SUCCESS" else True
571+
572+
if task_based:
573+
task_id = response["id"]
574+
assert task_id is not None
575+
wait_on(task_success, "Task successful", 60)
576+
else:
577+
assert response["total_notifications_sent"] == expected_count

bioblend/galaxy/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
notifications,
2323
quotas,
2424
roles,
25+
tasks,
2526
tool_data,
2627
tool_dependencies,
2728
tools,
@@ -111,6 +112,7 @@ def __init__(
111112
self.folders = folders.FoldersClient(self)
112113
self.tool_dependencies = tool_dependencies.ToolDependenciesClient(self)
113114
self.notifications = notifications.NotificationClient(self)
115+
self.tasks = tasks.TasksClient(self)
114116

115117
def __repr__(self) -> str:
116118
"""

bioblend/galaxy/tasks/__init__.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""
2+
Contains possible interaction dealing with Galaxy tasks.
3+
"""
4+
5+
from typing import TYPE_CHECKING
6+
7+
from bioblend.galaxy.client import Client
8+
9+
if TYPE_CHECKING:
10+
from uuid import UUID
11+
12+
from bioblend.galaxy import GalaxyInstance
13+
14+
15+
class TasksClient(Client):
16+
"""
17+
This endpoint only works on Galaxy 22.05 or later.
18+
"""
19+
20+
module = "tasks"
21+
22+
def __init__(self, galaxy_instance: "GalaxyInstance") -> None:
23+
super().__init__(galaxy_instance)
24+
25+
def get_task_status(self, task_id: "UUID") -> str:
26+
"""
27+
Determine state of task ID
28+
29+
:type task_id: UUID
30+
:param task_id: the task ID
31+
32+
:rtype: str
33+
:return: String indicating task state
34+
"""
35+
url = self._make_url() + f"/{task_id}/state"
36+
return self._get(url=url)

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ console_scripts =
8282
[options.extras_require]
8383
testing =
8484
pytest
85+
galaxy-tool-util
8586

8687
[options.package_data]
8788
bioblend =

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ commands =
66
pytest {posargs}
77
deps =
88
pytest
9+
galaxy-tool-util
910
passenv =
1011
BIOBLEND_GALAXY_API_KEY
1112
BIOBLEND_GALAXY_MASTER_API_KEY

0 commit comments

Comments
 (0)