aio

事件循环+回调

AsyncIO for the Working Python Developer

event loop :管理和分配的不同任务的执行,It registers them and handles distributing the flow of control between them. Coroutines(协程) :await会释放控制权到event loop ,协程一旦被Tasks包装成一个Future,就会安排到event loop上运行。Tasks挂起时,有其他的tasks pending 会进行上下文切换 Future:就是对象 包含可能或可能未执行的Tasks结果,This result may be an exception. context switch(上下文切换): 就是the event loop yielding the flow of control from one coroutine to the next

不同的task包装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio


async def foo():
    print('Running in foo')
    await asyncio.sleep(0)
    print('Explicit context switch to foo again')


async def bar():
    print('Explicit context to bar')
    await asyncio.sleep(0)
    print('Implicit context switch back to bar')


async def main():
    tasks = [foo(), bar()]
    await asyncio.gather(*tasks)
    # 与上面一样
    # await asyncio.wait(tasks)


asyncio.run(main())

Future 迭代器

gathering the coroutines into a list, each of them ready to be scheduled and executed. The as_completed function returns an iterator that will yield a completed future as they come in

1
2
3
4
5
6
7
8
9
10
11
12
async def main():
    start = time.time()
    futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
    """
    gathering the coroutines into a list, each of them ready to be scheduled and executed. 
    The as_completed function returns an iterator that will yield a completed future as they come in
    """
    for i, future in enumerate(asyncio.as_completed(futures)):
        result = await future
        print('{} {}'.format(">>" * (i + 1), result))

    print("Process took: {:.2f} seconds".format(time.time() - start))

同步异步

Future

状态

Pending Running Done Cancelled

When a future is done its result method will return the result of the future, if it’s pending or running it raises InvalidStateError, if it’s cancelled it will raise CancelledError, and finally if the coroutine raised an exception it will be raised again, which is the same behaviour as calling exception.

handle error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def fetch_ip(service):
    start = time.time()
    print('Fetching IP from {}'.format(service.name))

    try:
        json_response = await aiohttp_get_json(service.url)
    except:
        return '{} is unresponsive'.format(service.name)

    ip = json_response[service.ip_attr]

    return '{} finished with result: {}, took: {:.2f} seconds'.format(
        service.name, ip, time.time() - start)


async def main():
    futures = [fetch_ip(service) for service in SERVICES]
    done, _ = await asyncio.wait(futures)

    for future in done:
        print(future.result())


asyncio.run(main())

timeout

1
2
3
4
5
6
7
8
9
10
11
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
    futures, timeout=timeout, return_when=FIRST_COMPLETED)

for future in pending:
    future.cancel()

for future in done:
    response["ip"] = future.result()

print(response)

回调

把I/O事件的等待和监听任务交给了 OS,那 OS 在知道I/O状态发生改变后(例如socket连接已建立成功可发送数据),它又怎么知道接下来该干嘛呢?只能回调。

缺点:

  • 回调层次过多时代码可读性差
  • 破坏代码结构 (原本从上而下的代码结构,要改成从内到外的)
  • 共享状态管理困难
  • 错误处理困难 一连串的回调构成一个完整的调用链,调用链断掉,接力传递的状态也会丢失,这种现象称为调用栈撕裂。所以,为了防止栈撕裂,异常必须以数据的形式返回,而不是直接抛出异常,然后每个回调中需要检查上次调用的返回值,以防错误吞没

换言之,程序得知道当前所处的状态,而且要将这个状态在不同的回调之间延续下去

协程

任务之间得相互通知,每个任务得有自己的状态。 每个协程具有自己的栈帧,当然能知道自己处于什么状态,协程之间可以协作那自然可以通知别的协程

生成器协程风格和回调风格对比总结 在回调风格中:

存在链式回调(虽然示例中嵌套回调只有一层) 请求和响应也不得不分为两个回调以至于破坏了同步代码那种结构 程序员必须在回调之间维护必须的状态。

而基于生成器协程的风格:

无链式调用 selector的回调里只管给future设置值,不再关心业务逻辑 loop 内回调callback()不再关注是谁触发了事件 已趋近于同步代码的结构 无需程序员在多个协程之间维护状态

yield from解决的就是在生成器里玩生成器不方便 让嵌套生成器不必通过循环迭代yield,而是直接yield from

1
2
3
4
5
6
7
def b():
    for i in a:
        yield i


def c():
    yield from a

进程是表示资源分配的基本单位 都有自己的独立内存空间 上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大

线程是操作系统能够进行运算调度的最小单位 共享进程的内存空间

进程上下文切换与线程上下文切换最主要的区别就是线程的切换虚拟空间内存是相同的(因为都是属于自己的进程),但是,进程切换的虚拟空间内存则是不同的。 通过操作系统内核来完成

进程调度,切换进程上下文,包括分配的内存,包括数据段,附加段,堆栈段,代码段,以及一些表格。 线程调度,切换线程上下文,主要切换堆栈,以及各寄存器,因为同一个进程里的线程除了堆栈不同。

进程切换分两步 1.切换页目录以使用新的地址空间 2.切换内核栈和硬件上下文。

对于linux来说,线程和进程的最大区别就在于地址空间。 对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的。所以明显是进程切换代价大

协程又称为轻量级线程,每个协程都自带了一个栈,可以认为一个协程就是一个函数和这个存放这个函数运行时数据的栈,这个栈非常小,一般只有几十kb。

协程优点

1、无需线程上下文切换的开销

2、无需原子操作锁定及同步的开销

3、方便切换控制流

4、高并发低成本

协程使用一般是解决 I/O阻塞

(为什么线程不能解决: 因为一个IO操作就阻塞了当前线程,导致其他代码无法执行,所以我们必须使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响。

多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。)

因为当一个I/O阻塞时,它可以切换到其他没有阻塞的协程上去继续执行,这样就有了比较高的效率

协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。

py 协程

线程是内核进行抢占式的调度的,这样就确保了每个线程都有执行的机会。 而 coroutine 运行在同一个线程中,由语言的运行时中的 EventLoop(事件循环)来进行调度。 和大多数语言一样,在 Python 中,协程的调度是非抢占式的,也就是说一个协程必须主动让出执行机会,其他协程才有机会运行。 让出执行的关键字就是 await。也就是说一个协程如果阻塞了,持续不让出 CPU,那么整个线程就卡住了,没有任何并发。

PS: 作为服务端,event loop最核心的就是IO多路复用技术,所有来自客户端的请求都由IO多路复用函数来处理;作为客户端,event loop的核心在于利用Future对象延迟执行,并使用send函数激发协程,挂起,等待服务端处理完成返回后再调用CallBack函数继续下面的流程

go 协程

Go 语言通过系统的线程来多路派遣这些函数的执行,使得 每个用 go 关键字执行的函数可以运行成为一个单位协程。 当一个协程阻塞的时候,调度器就会自动把其他协程安排到另外的线程中去执行,从而实现了程序无等待并行化运行。 而且调度的开销非常小,一颗 CPU 调度的规模不下于每秒百万次,这使得我们能够创建大量的 goroutine,

go的协程本质上还是系统的线程调用,而Python中的协程是eventloop模型实现,所以虽然都叫协程,但并不是一个东西. Python 中的协程是严格的 1:N 关系,也就是一个线程对应了多个协程。虽然可以实现异步I/O,但是不能有效利用多核(GIL)。 而 Go 中是 M:N 的关系,也就是 N 个协程会映射分配到 M 个线程上,这样带来了两点好处:

  • 多个线程能分配到不同核心上,CPU 密集的应用使用 goroutine 也会获得加速.
  • 即使有少量阻塞的操作,也只会阻塞某个 worker 线程,而不会把整个程序阻塞。

两种协程对比:

  • async是非抢占式的,一旦开始采用 async 函数,那么你整个程序都必须是 async 的,不然总会有阻塞的地方(一遇阻塞对于没有实现异步特性的库就无法主动让调度器调度其他协程了),也就是说 async 具有传染性。
  • Python 整个异步编程生态的问题,之前标准库和各种第三方库的阻塞性函数都不能用了,如:requests,redis.py,open 函数等。所以 Python3.5后加入协程的最大问题不是不好用,而是生态环境不好,历史包袱再次上演,动态语言基础上再加上多核之间的任务调度,应该是很难的技术吧,真心希望python4.0能优化或者放弃GIL锁,使用多核提升性能。
  • goroutine 是 go 与生俱来的特性,所以几乎所有库都是可以直接用的,避免了 Python 中需要把所有库重写一遍的问题。
  • goroutine 中不需要显式使用 await 交出控制权,但是 Go 也不会严格按照时间片去调度 goroutine,而是会在可能阻塞的地方插入调度。goroutine 的调度可以看做是半抢占式的。

绕过GIL的两种思路

1
2
3
4
5
1. 绕过CPython,使用JPython等别的实现;



2. 把关键性能代码放到其他语言中实现,比如C++。