Skip to content
This repository was archived by the owner on Jun 10, 2024. It is now read-only.

Commit 7037a77

Browse files
plusplus1binux
authored andcommitted
optimise scheluler dynamic select limit and improve task queue (#796)
* optimise scheduler select-limit and task queue * fix test case in python2.6 * fix: time priority queue only compare exetime * update:add test case for time priority queue * optimise: add globally auto increasing value for task to keep priority queue in order
1 parent 6634729 commit 7037a77

File tree

3 files changed

+197
-15
lines changed

3 files changed

+197
-15
lines changed

pyspider/scheduler/scheduler.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,24 +478,51 @@ def _check_select(self):
478478
cnt = 0
479479
cnt_dict = dict()
480480
limit = self.LOOP_LIMIT
481-
for project in itervalues(self.projects):
481+
482+
# dynamic assign select limit for each project, use qsize as weight
483+
project_weights, total_weight = dict(), 0
484+
for project in itervalues(self.projects): # type:Project
482485
if not project.active:
483486
continue
484487
# only check project pause when select new tasks, cronjob and new request still working
485488
if project.paused:
486489
continue
487490
if project.waiting_get_info:
488491
continue
492+
493+
# task queue
494+
task_queue = project.task_queue # type:TaskQueue
495+
pro_weight = task_queue.size()
496+
total_weight += pro_weight
497+
project_weights[project.name] = pro_weight
498+
pass
499+
500+
min_project_limit = int(limit / 10.) # ensure minimum select limit for each project
501+
max_project_limit = int(limit / 3.0) # ensure maximum select limit for each project
502+
503+
for pro_name, pro_weight in iteritems(project_weights):
489504
if cnt >= limit:
490505
break
491506

507+
project = self.projects[pro_name] # type:Project
508+
492509
# task queue
493510
task_queue = project.task_queue
494511
task_queue.check_update()
495512
project_cnt = 0
496513

514+
# calculate select limit for project
515+
if total_weight < 1 or pro_weight < 1:
516+
project_limit = min_project_limit
517+
else:
518+
project_limit = int((1.0 * pro_weight / total_weight) * limit)
519+
if project_limit < min_project_limit:
520+
project_limit = min_project_limit
521+
elif project_limit > max_project_limit:
522+
project_limit = max_project_limit
523+
497524
# check send_buffer here. when not empty, out_queue may blocked. Not sending tasks
498-
while cnt < limit and project_cnt < limit / 10:
525+
while cnt < limit and project_cnt < project_limit:
499526
taskid = task_queue.get()
500527
if not taskid:
501528
break

pyspider/scheduler/task_queue.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
# http://binux.me
66
# Created on 2014-02-07 13:12:10
77

8-
import time
98
import heapq
109
import logging
1110
import threading
11+
import time
12+
1213
try:
1314
from UserDict import DictMixin
1415
except ImportError:
@@ -24,8 +25,21 @@
2425
cmp = lambda x, y: (x > y) - (x < y)
2526

2627

28+
class AtomInt(object):
29+
__value__ = 0
30+
__mutex__ = threading.RLock()
31+
32+
@classmethod
33+
def get_value(cls):
34+
cls.__mutex__.acquire()
35+
cls.__value__ = cls.__value__ + 1
36+
value = cls.__value__
37+
cls.__mutex__.release()
38+
return value
39+
40+
2741
class InQueueTask(DictMixin):
28-
__slots__ = ('taskid', 'priority', 'exetime')
42+
__slots__ = ('taskid', 'priority', 'exetime', 'sequence')
2943
__getitem__ = lambda *x: getattr(*x)
3044
__setitem__ = lambda *x: setattr(*x)
3145
__iter__ = lambda self: iter(self.__slots__)
@@ -36,19 +50,23 @@ def __init__(self, taskid, priority=0, exetime=0):
3650
self.taskid = taskid
3751
self.priority = priority
3852
self.exetime = exetime
53+
self.sequence = AtomInt.get_value()
3954

4055
def __cmp__(self, other):
4156
if self.exetime == 0 and other.exetime == 0:
42-
return -cmp(self.priority, other.priority)
57+
diff = -cmp(self.priority, other.priority)
4358
else:
44-
return cmp(self.exetime, other.exetime)
59+
diff = cmp(self.exetime, other.exetime)
60+
61+
# compare in-queue sequence number finally if two element has the same
62+
# priority or exetime
63+
return diff if diff != 0 else cmp(self.sequence, other.sequence)
4564

4665
def __lt__(self, other):
4766
return self.__cmp__(other) < 0
4867

4968

5069
class PriorityTaskQueue(Queue.Queue):
51-
5270
'''
5371
TaskQueue
5472
@@ -66,12 +84,10 @@ def _put(self, item, heappush=heapq.heappush):
6684
if item.taskid in self.queue_dict:
6785
task = self.queue_dict[item.taskid]
6886
changed = False
69-
if item.priority > task.priority:
70-
task.priority = item.priority
71-
changed = True
72-
if item.exetime < task.exetime:
73-
task.exetime = item.exetime
87+
if item < task:
7488
changed = True
89+
task.priority = max(item.priority, task.priority)
90+
task.exetime = min(item.exetime, task.exetime)
7591
if changed:
7692
self._resort()
7793
else:
@@ -113,7 +129,6 @@ def __delitem__(self, taskid):
113129

114130

115131
class TaskQueue(object):
116-
117132
'''
118133
task queue for scheduler, have a priority queue and a time queue for delayed tasks
119134
'''
@@ -155,7 +170,7 @@ def _check_time_queue(self):
155170
now = time.time()
156171
self.mutex.acquire()
157172
while self.time_queue.qsize() and self.time_queue.top and self.time_queue.top.exetime < now:
158-
task = self.time_queue.get_nowait()
173+
task = self.time_queue.get_nowait() # type: InQueueTask
159174
task.exetime = 0
160175
self.priority_queue.put(task)
161176
self.mutex.release()
@@ -173,9 +188,24 @@ def _check_processing(self):
173188
self.mutex.release()
174189

175190
def put(self, taskid, priority=0, exetime=0):
176-
'''Put a task into task queue'''
191+
"""
192+
Put a task into task queue
193+
194+
when use heap sort, if we put tasks(with the same priority and exetime=0) into queue,
195+
the queue is not a strict FIFO queue, but more like a FILO stack.
196+
It is very possible that when there are continuous big flow, the speed of select is
197+
slower than request, resulting in priority-queue accumulation in short time.
198+
In this scenario, the tasks more earlier entering the priority-queue will not get
199+
processed until the request flow becomes small.
200+
201+
Thus, we store a global atom self increasing value into task.sequence which represent
202+
the task enqueue sequence. When the comparison of exetime and priority have no
203+
difference, we compare task.sequence to ensure that the entire queue is ordered.
204+
"""
177205
now = time.time()
206+
178207
task = InQueueTask(taskid, priority, exetime)
208+
179209
self.mutex.acquire()
180210
if taskid in self.priority_queue:
181211
self.priority_queue.put(task)
@@ -189,7 +219,9 @@ def put(self, taskid, priority=0, exetime=0):
189219
if exetime and exetime > now:
190220
self.time_queue.put(task)
191221
else:
222+
task.exetime = 0
192223
self.priority_queue.put(task)
224+
193225
self.mutex.release()
194226

195227
def get(self):

tests/test_task_queue.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import time
5+
import unittest
6+
7+
import six
8+
from six.moves import queue as Queue
9+
10+
from pyspider.scheduler.task_queue import InQueueTask, TaskQueue
11+
12+
13+
class TestTaskQueue(unittest.TestCase):
14+
"""
15+
TestTaskQueue
16+
"""
17+
18+
def test_task_queue_in_time_order(self):
19+
tq = TaskQueue(rate=300, burst=1000)
20+
21+
queues = dict()
22+
tasks = dict()
23+
24+
for i in range(0, 100):
25+
it = InQueueTask(str(i), priority=int(i // 10), exetime=0)
26+
tq.put(it.taskid, it.priority, it.exetime)
27+
28+
if it.priority not in queues:
29+
queues[it.priority] = Queue.Queue()
30+
31+
q = queues[it.priority] # type:Queue.Queue
32+
q.put(it)
33+
tasks[it.taskid] = it
34+
six.print_('put, taskid=', it.taskid, 'priority=', it.priority, 'exetime=', it.exetime)
35+
for i in range(0, 100):
36+
task_id = tq.get()
37+
task = tasks[task_id]
38+
q = queues[task.priority] # type: Queue.Queue
39+
expect_task = q.get()
40+
self.assertEqual(task_id, expect_task.taskid)
41+
self.assertEqual(task.priority, int(9 - i // 10))
42+
six.print_('get, taskid=', task.taskid, 'priority=', task.priority, 'exetime=', task.exetime)
43+
44+
self.assertEqual(tq.size(), 100)
45+
self.assertEqual(tq.priority_queue.qsize(), 0)
46+
self.assertEqual(tq.processing.qsize(), 100)
47+
for q in six.itervalues(queues): # type:Queue.Queue
48+
self.assertEqual(q.qsize(), 0)
49+
pass
50+
51+
pass
52+
53+
54+
class TestTimeQueue(unittest.TestCase):
55+
def test_time_queue(self):
56+
57+
six.print_('Test time queue order by time only')
58+
59+
tq = TaskQueue(rate=300, burst=1000)
60+
61+
fifo_queue = Queue.Queue()
62+
63+
interval = 5.0 / 1000
64+
65+
for i in range(0, 20):
66+
it = InQueueTask(str(i), priority=int(i // 10), exetime=time.time() + (i + 1) * interval)
67+
tq.put(it.taskid, it.priority, it.exetime)
68+
fifo_queue.put(it)
69+
six.print_('put, taskid=', it.taskid, 'priority=', it.priority, 'exetime=', it.exetime)
70+
71+
self.assertEqual(tq.priority_queue.qsize(), 0)
72+
self.assertEqual(tq.processing.qsize(), 0)
73+
self.assertEqual(tq.time_queue.qsize(), 20)
74+
75+
for i in range(0, 20):
76+
t1 = fifo_queue.get()
77+
t2 = tq.time_queue.get()
78+
self.assertEqual(t1.taskid, t2.taskid)
79+
six.print_('get, taskid=', t2.taskid, 'priority=', t2.priority, 'exetime=', t2.exetime)
80+
self.assertEqual(tq.priority_queue.qsize(), 0)
81+
self.assertEqual(tq.processing.qsize(), 0)
82+
self.assertEqual(tq.time_queue.qsize(), 0)
83+
84+
queues = dict()
85+
tasks = dict()
86+
for i in range(0, 20):
87+
priority = int(i // 10)
88+
it = InQueueTask(str(i), priority=priority, exetime=time.time() + (i + 1) * interval)
89+
tq.put(it.taskid, it.priority, it.exetime)
90+
tasks[it.taskid] = it
91+
92+
if priority not in queues:
93+
queues[priority] = Queue.Queue()
94+
q = queues[priority]
95+
q.put(it)
96+
pass
97+
98+
self.assertEqual(tq.priority_queue.qsize(), 0)
99+
self.assertEqual(tq.processing.qsize(), 0)
100+
self.assertEqual(tq.time_queue.qsize(), 20)
101+
102+
time.sleep(20 * interval)
103+
tq.check_update()
104+
self.assertEqual(tq.priority_queue.qsize(), 20)
105+
self.assertEqual(tq.processing.qsize(), 0)
106+
self.assertEqual(tq.time_queue.qsize(), 0)
107+
for i in range(0, 20):
108+
taskid = tq.get()
109+
t1 = tasks[taskid]
110+
t2 = queues[t1.priority].get()
111+
self.assertEqual(t1.taskid, t2.taskid)
112+
113+
self.assertEqual(tq.priority_queue.qsize(), 0)
114+
self.assertEqual(tq.processing.qsize(), 20)
115+
self.assertEqual(tq.time_queue.qsize(), 0)
116+
117+
pass
118+
119+
pass
120+
121+
122+
if __name__ == '__main__':
123+
unittest.main()

0 commit comments

Comments
 (0)