Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import concurrent.futures
import multiprocessing
import unittest

import time
from winloop import _testbase as tb


Expand Down Expand Up @@ -37,9 +37,52 @@ def test_executors_process_pool_01(self):
def test_executors_process_pool_02(self):
self.run_pool_test(concurrent.futures.ThreadPoolExecutor)



class TestUVExecutors(_TestExecutors, tb.UVTestCase):
pass
# Only libuv can feasabily do this.
# this was implemented to help combat resource problems

def test_libuv_threadpool(self):
self.loop.set_default_executor(None)
async def run():
coros = []
for i in range(0, 10):
coros.append(self.loop.run_in_executor(None, fib, i))
res = await asyncio.gather(*coros)
self.assertEqual(res, fib10)
await asyncio.sleep(0.01)
fib10 = [fib(i) for i in range(10)]
self.loop.run_until_complete(run())

def test_libuv_threadpool_exception(self):
self.loop.set_default_executor(None)
async def run():
class TestException(Exception):
pass

def execption():
raise TestException("Hello")

with self.assertRaises(TestException):
await self.loop.run_in_executor(None, execption)

self.loop.run_until_complete(run())

def test_libuv_threadpool_cancellation(self):
self.loop.set_default_executor(None)

async def run():

def eternity():
time(3600)

fut = self.loop.run_in_executor(None, eternity)
fut.cancel()
with self.assertRaises(asyncio.CancelledError):
await fut
self.loop.run_until_complete(run())



class TestAIOExecutors(_TestExecutors, tb.AIOTestCase):
Expand Down
9 changes: 9 additions & 0 deletions winloop/includes/uv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,12 @@ cdef extern from "uv.h" nogil:
unsigned int uv_version()

int uv_pipe(uv_file fds[2], int read_flags, int write_flags)


ctypedef struct uv_work_t:
void* data

ctypedef void (*uv_work_cb)(uv_work_t* req)
ctypedef void (*uv_after_work_cb)(uv_work_t* req, int status)
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb);

101 changes: 30 additions & 71 deletions winloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -128,74 +128,7 @@ cdef inline run_in_context2(context, method, arg1, arg2):
Py_DECREF(method)


def list2cmdline(seq):
"""
Translate a sequence of arguments into a command line
string, using the same rules as the MS C runtime:

1) Arguments are delimited by white space, which is either a
space or a tab.

2) A string surrounded by double quotation marks is
interpreted as a single argument, regardless of white space
contained within. A quoted string can be embedded in an
argument.

3) A double quotation mark preceded by a backslash is
interpreted as a literal double quotation mark.

4) Backslashes are interpreted literally, unless they
immediately precede a double quotation mark.

5) If backslashes immediately precede a double quotation mark,
every pair of backslashes is interpreted as a literal
backslash. If the number of backslashes is odd, the last
backslash escapes the next double quotation mark as
described in rule 3.
"""

# See
# http://msdn.microsoft.com/en-us/library/17w5ykft.aspx
# or search http://msdn.microsoft.com for
# "Parsing C++ Command-Line Arguments"
result = []
needquote = False
for arg in map(os.fsdecode, seq):
bs_buf = []

# Add a space to separate this argument from the others
if result:
result.append(' ')

needquote = (" " in arg) or ("\t" in arg) or not arg
if needquote:
result.append('"')

for c in arg:
if c == '\\':
# Don't know if we need to double yet.
bs_buf.append(c)
elif c == '"':
# Double backslashes.
result.append('\\' * len(bs_buf)*2)
bs_buf = []
result.append('\\"')
else:
# Normal char
if bs_buf:
result.extend(bs_buf)
bs_buf = []
result.append(c)

# Add remaining backslashes, if any.
if bs_buf:
result.extend(bs_buf)

if needquote:
result.extend(bs_buf)
result.append('"')

return ''.join(result).replace('\\"', '"')

# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
# *reuse_address* parameter
Expand Down Expand Up @@ -2849,12 +2782,17 @@ cdef class Loop:

if executor is None:
executor = self._default_executor
# Only check when the default executor is being used
# Only check when a default executor is set
# we
self._check_default_executor()
if executor is None:
executor = cc_ThreadPoolExecutor()
self._default_executor = executor

# if we do not have a default executor
# we should use libuv's threadpool
# to eliminate some bulkier payloads
# such as when running anything from a
# pure python executor for instance.
return _ExecutorFuture(self, func, args)

return aio_wrap_future(executor.submit(func, *args), loop=self)

def set_default_executor(self, executor):
Expand Down Expand Up @@ -3480,6 +3418,27 @@ class _SyncSocketWriterFuture(aio_Future):
self.__remove_writer()
aio_Future.cancel(self)

# used when no executor was passed and we want to
# utilize upon libuv's threadpool instead
class _ExecutorFuture(aio_Future):
def __init__(self, loop, func, args) -> None:
aio_Future.__init__(self, loop=loop)
self.__work = UVWork(loop, self, func, args)

def __remove_worker(self):
if self.__work is not None:
self.__work._cancel()
self.__work = None

if PY39:
def cancel(self, msg=None):
self.__remove_worker()
aio_Future.cancel(self, msg=msg)

else:
def cancel(self):
self.__remove_worker()
aio_Future.cancel(self)

include "cbhandles.pyx"
include "pseudosock.pyx"
Expand Down
20 changes: 20 additions & 0 deletions winloop/request.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,23 @@ cdef class UVRequest:

cdef on_done(self)
cdef cancel(self)





# Based off concurrent.futures.thread._WorkItem and Future
# this is a backup alternative so that some resources from
# using a default executor are eliminated.
cdef class UVWork(UVRequest):
cdef:
object fut # asyncio.Future[...]
object fn
object args
object result
object exc

cdef void run(self) noexcept with gil



82 changes: 82 additions & 0 deletions winloop/request.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,85 @@ cdef class UVRequest:
else:
ex = convert_error(err)
self.loop._handle_exception(ex)


cdef class UVWork(UVRequest):
cdef cancel(self):
UVRequest.cancel(self)
# if successful do the same on the future object's end.
if not self.fut.cancelled():
self.fut.cancel()

# shortcut
def _cancel(self):
return self.cancel()

cdef void run(self) noexcept with gil:
cdef object result
try:
# if the eventloop fires the task but we cancelled previously
# it's best to try exiting now instead of later...
if self.fut.cancelled():
return
try:
result = self.fn(*self.args)
except BaseException as exc:
self.exc = exc
else:
self.result = result
except BaseException as ex:
# if anything else fails. We don't have to be a sitting duck
# and let it slide. we can handle it as soon as possible...
self.exc = exc

def __cinit__(self, Loop loop, object fut, object fn, object args):
self.request = <uv.uv_req_t*>PyMem_RawMalloc(sizeof(uv.uv_work_t))
if self.request == NULL:
raise MemoryError

self.loop = loop
self.done = 0
self.fn = fn
self.args = args
self.result = None
self.exc = None
self.fut = fut

self.request.data = <void*>self

# UV_EINVAL will never happen because our callback exists...
uv.uv_queue_work(self.loop.uvloop, <uv.uv_work_t*>self.request, __on_work_cb, __on_after_work_cb)
Py_INCREF(self)

def __dealloc__(self):
if self.request != NULL:
PyMem_RawFree(self.request)



cdef void __on_work_cb(uv.uv_work_t* req) noexcept with gil:
(<UVWork>req.data).run()

cdef void __on_after_work_cb(uv.uv_work_t* req, int err) noexcept with gil:
cdef object ex
cdef UVWork work = (<UVWork>req.data)
try:
if err == uv.UV_ECANCELED:
if not work.fut.cancelled():
work.fut.cancel()

elif err != 0:
ex = convert_error(err)
work.fut.set_exception(ex)

if work.exc is not None:
work.fut.set_exception(work.exc)
else:
work.fut.set_result(work.result)
except BaseException as e:
work.loop._handle_exception(e)
finally:
work.on_done()



Loading