`

Python协程:从yield/send到async/await

阅读更多

Python由于众所周知的GIL的原因,导致其线程无法发挥多核的并行计算能力(当然,后来有了multiprocessing,可以实现多进程并行),显得比较鸡肋。既然在GIL之下,同一时刻只能有一个线程在运行,那么对于CPU密集的程序来说,线程之间的切换开销就成了拖累,而以I/O为瓶颈的程序正是协程所擅长的:

多任务并发(非并行),每个任务在合适的时候挂起(发起I/O)和恢复(I/O结束)

Python中的协程经历了很长的一段发展历程。其大概经历了如下三个阶段:

  1. 最初的生成器变形yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await关键字

从yield说起

先看一段普通的计算斐波那契续列的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def old_fib(n):
	res = [0] * n
	index = 0
	a = 0
	b = 1
	while index < n:
		res[index] = b
		a, b = b, a + b
		index += 1
	return res

print('-'*10 + 'test old fib' + '-'*10)
for fib_res in old_fib(20):
	print(fib_res)

 

如果我们仅仅是需要拿到斐波那契序列的第n位,或者仅仅是希望依此产生斐波那契序列,那么上面这种传统方式就会比较耗费内存。

这时,yield就派上用场了。

1
2
3
4
5
6
7
8
9
10
11
12
def fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		yield b
		a, b = b, a + b
		index += 1

print('-'*10 + 'test yield fib' + '-'*10)
for fib_res in fib(20):
	print(fib_res)

 

当一个函数中包含yield语句时,python会自动将其识别为一个生成器。这时fib(20)并不会真正调用函数体,而是以函数体生成了一个生成器对象实例。

yield在这里可以保留fib函数的计算现场,暂停fib的计算并将b返回。而将fib放入for…in循环中时,每次循环都会调用next(fib(20)),唤醒生成器,执行到下一个yield语句处,直到抛出StopIteration异常。此异常会被for循环捕获,导致跳出循环。

Send来了

从上面的程序中可以看到,目前只有数据从fib(20)中通过yield流向外面的for循环;如果可以向fib(20)发送数据,那不是就可以在Python中实现协程了嘛。

于是,Python中的生成器有了send函数,yield表达式也拥有了返回值。

我们用这个特性,模拟一个额慢速斐波那契数列的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def stupid_fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		sleep_cnt = yield b
		print('let me think {0} secs'.format(sleep_cnt))
		time.sleep(sleep_cnt)
		a, b = b, a + b
		index += 1
print('-'*10 + 'test yield send' + '-'*10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib)
while True:
	print(fib_res)
	try:
		fib_res = sfib.send(random.uniform(0, 0.5))
	except StopIteration:
		break

 

其中next(sfib)相当于sfib.send(None),可以使得sfib运行至第一个yield处返回。后续的sfib.send(random.uniform(0, 0.5))则将一个随机的秒数发送给sfib,作为当前中断的yield表达式的返回值。这样,我们可以从“主”程序中控制协程计算斐波那契数列时的思考时间,协程可以返回给“主”程序计算结果,Perfect!

yield from是个什么鬼?

yield from用于重构生成器,简单的,可以这么使用:

1
2
3
4
5
6
7
def copy_fib(n):
	print('I am copy from fib')
	yield from fib(n)
	print('Copy end')
print('-'*10 + 'test yield from' + '-'*10)
for fib_res in copy_fib(20):
	print(fib_res)

 

这种使用方式很简单,但远远不是yield from的全部。yield from的作用还体现可以像一个管道一样将send信息传递给内层协程,并且处理好了各种异常情况,因此,对于stupid_fib也可以这样包装和使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def copy_stupid_fib(n):
	print('I am copy from stupid fib')
	yield from stupid_fib(n)
	print('Copy end')
print('-'*10 + 'test yield from and send' + '-'*10)
N = 20
csfib = copy_stupid_fib(N)
fib_res = next(csfib)
while True:
	print(fib_res)
	try:
		fib_res = csfib.send(random.uniform(0, 0.5))
	except StopIteration:
		break

 

如果没有yield from,这里的copy_yield_from将会特别复杂(因为要自己处理各种异常)。

asyncio.coroutine和yield from

yield from在asyncio模块中得以发扬光大。先看示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@asyncio.coroutine
def smart_fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		sleep_secs = random.uniform(0, 0.2)
		yield from asyncio.sleep(sleep_secs)
		print('Smart one think {} secs to get {}'.format(sleep_secs, b))
		a, b = b, a + b
		index += 1

@asyncio.coroutine
def stupid_fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		sleep_secs = random.uniform(0, 0.4)
		yield from asyncio.sleep(sleep_secs)
		print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
		a, b = b, a + b
		index += 1

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	tasks = [
		asyncio.async(smart_fib(10)),
		asyncio.async(stupid_fib(10)),
	]
	loop.run_until_complete(asyncio.wait(tasks))
	print('All fib finished.')
	loop.close()

 

asyncio是一个基于事件循环的实现异步I/O的模块。通过yield from,我们可以将协程asyncio.sleep的控制权交给事件循环,然后挂起当前协程;之后,由事件循环决定何时唤醒asyncio.sleep,接着向后执行代码。

这样说可能比较抽象,好在asyncio是一个由python实现的模块,那么我们来看看asyncio.sleep中都做了些什么:

1
2
3
4
5
6
7
8
9
10
@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    future = futures.Future(loop=loop)
    h = future._loop.call_later(delay,
                                future._set_result_unless_cancelled, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

 

首先,sleep创建了一个Future对象,作为更内层的协程对象,通过yield from交给了事件循环;其次,它通过调用事件循环的call_later函数,注册了一个回调函数。

通过查看Future类的源码,可以看到,Future是一个实现了__iter__对象的生成器:

1
2
3
4
5
6
7
8
9
class Future:
	#blabla...
    def __iter__(self):
        if not self.done():
            self._blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

 

那么当我们的协程yield from asyncio.sleep时,事件循环其实是与Future对象建立了练习。每次事件循环调用send(None)时,其实都会传递到Future对象的__iter__函数调用;而当Future尚未执行完毕的时候,就会yield self,也就意味着暂时挂起,等待下一次send(None)的唤醒。

当我们包装一个Future对象产生一个Task对象时,在Task对象初始化中,就会调用Future的send(None),并且为Future设置好回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Task(futures.Future):
	#blabla...
    def _step(self, value=None, exc=None):
		#blabla...
        try:
            if exc is not None:
                result = coro.throw(exc)
            elif value is not None:
                result = coro.send(value)
            else:
                result = next(coro)
		#exception handle
        else:
            if isinstance(result, futures.Future):
                # Yielded Future must come from Future.__iter__().
                if result._blocking:
                    result._blocking = False
                    result.add_done_callback(self._wakeup)
		#blabla...

    def _wakeup(self, future):
        try:
            value = future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(None, exc)
        else:
            self._step(value, None)
        self = None  # Needed to break cycles when an exception occurs.

 

预设的时间过后,事件循环将调用Future._set_result_unless_cancelled:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Future:
	#blabla...
    def _set_result_unless_cancelled(self, result):
        """Helper setting the result only if the future was not cancelled."""
        if self.cancelled():
            return
        self.set_result(result)

    def set_result(self, result):
        """Mark the future done and set its result.

        If the future is already done when this method is called, raises
        InvalidStateError.
        """
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()

 

这将改变Future的状态,同时回调之前设定好的Tasks._wakeup;在_wakeup中,将会再次调用Tasks._step,这时,Future的状态已经标记为完成,因此,将不再yield self,而return语句将会触发一个StopIteration异常,此异常将会被Task._step捕获用于设置Task的结果。同时,整个yield from链条也将被唤醒,协程将继续往下执行。

async和await

弄清楚了asyncio.coroutine和yield from之后,在Python3.5中引入的async和await就不难理解了:可以将他们理解成asyncio.coroutine/yield from的完美替身。当然,从Python设计的角度来说,async/await让协程表面上独立于生成器而存在,将细节都隐藏于asyncio模块之下,语法更清晰明了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
async def smart_fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		sleep_secs = random.uniform(0, 0.2)
		await asyncio.sleep(sleep_secs)
		print('Smart one think {} secs to get {}'.format(sleep_secs, b))
		a, b = b, a + b
		index += 1

async def stupid_fib(n):
	index = 0
	a = 0
	b = 1
	while index < n:
		sleep_secs = random.uniform(0, 0.4)
		await asyncio.sleep(sleep_secs)
		print('Stupid one think {} secs to get {}'.format(sleep_secs, b))
		a, b = b, a + b
		index += 1

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	tasks = [
		asyncio.ensure_future(smart_fib(10)),
		asyncio.ensure_future(stupid_fib(10)),
	]
	loop.run_until_complete(asyncio.wait(tasks))
	print('All fib finished.')
	loop.close()

 

想要继续弄清楚async/await和asyncio.coroutine/yield from的区别,可以看看这篇文章

总结


至此,Python中的协程就介绍完毕了。示例程序中都是以sleep为异步I/O的代表,在实际项目中,可以使用协程异步的读写网络、读写文件、渲染界面等,而在等待协程完成的同时,CPU还可以进行其他的计算。协程的作用正在于此。

相关代码可以在GitHub上找到https://github.com/yubo1911/saber/tree/master/coroutine

转载请注明出处: http://blog.guoyb.com/2016/07/03/python-coroutine/

分享到:
评论

相关推荐

    Python3.5中async_await特性的实现.pdf

    在Python中,协程的演化过程经历了从生成器(Generator)到增强型生成器(Enhanced Generators),最终发展到了async/await。 在Python 2.2版本中,PEP 255提出了简单生成器的概念,生成器提供了一种方便的迭代器...

    深入浅析python 协程与go协程的区别

    my_coro.send(42) # 发送数据到协程 ``` ##### 2.2 基于 `async/await` 的异步编程 Python 3.5 引入了 `async/await` 语法,这使得编写异步代码变得更加自然和直观。`async def` 定义了一个协程函数,而 `await` ...

    浅析python协程相关概念

    Python协程是一种高级编程概念,它允许程序员创建能够暂停和恢复...理解生成器、协程以及`async/await`语法对于编写高性能的Python代码至关重要。通过实践和深入学习,你可以掌握这一强大的工具,提升你的编程能力。

    python协程用法实例分析

    - 发送数据到协程:`matcher.send('hello python')`,`send()`方法可以把数据传入协程,并使协程从上次暂停的地方继续执行,就像`yield`的返回值一样。 3. **协程的交互**: - `send()`方法不仅可以传递数据,还...

    为什么你还不懂得怎么使用Python协程

    在Python 3.5及更高版本中,引入了`async`和`await`这两个关键字,使协程的定义和使用变得更加简洁和直观。`async def`用于定义协程函数,而`await`关键字用于在协程内部等待另一个协程的完成。例如: ```python ...

    python中的协程深入理解

    总之,Python中的协程是实现异步编程的关键工具,它们通过`yield`和`async/await`语法提供了轻量级的并发机制,特别适用于处理I/O密集型任务。理解和熟练掌握协程,对于编写高性能的Python应用程序至关重要。

    Python-3.5.3.tgz

    Python 3.5引入了异步IO的核心支持,通过`async`和`await`关键字,使得编写协程(coroutines)更加直观。在3.5.3中,这个功能得到了进一步的完善和优化,使得并发编程更加高效,适用于网络I/O密集型应用。 2. **...

    python高级编程.pdf

    - **`async`和`await`原生协程**:Python 3.5引入的新特性,支持异步编程。 #### 第十三章:AsyncIO并发编程 - **事件循环**:异步编程的核心机制。 - **协程嵌套**:在协程中调用其他协程。 - **`...

    Python yield的用法实例分析

    生成器的另一个重要应用场景是创建并发或异步代码,尤其是配合`asyncio`库中的`async`和`await`关键字。在这里,`yield from`(Python 3.3+)或`yield`可以用来暂停协程的执行,等待其他任务完成,从而实现非阻塞的I...

    python3.5.tar.gz

    `async def`定义了一个协程函数,而`await`用于挂起协程并在等待异步操作完成时恢复执行。 2. **新的字典方法**:Python 3.5对字典类进行了增强,添加了`get_or_default`、`popitem(last=False)`和`keysviews()`等...

    python官方3.5.0b2版本exe安装包

    其次,Python 3.5引入了异步IO支持,这是通过引入`async`和`await`关键字实现的。这些关键字允许开发者编写非阻塞的并发代码,显著提升了处理I/O密集型任务的效率。`asyncio`库成为了Python标准库的一部分,为编写...

Global site tag (gtag.js) - Google Analytics