python学习总结
在编写程序处理一些任务时为了提高效率,我们的程序会从单一的串行运行的方式向多线程或者多进程的方式来转变,这里学习总结通过python协程来提高程序运行的效率。多进程适合于CPU密集型的程序(各种循环处理,计数等等),多线程适合I/O密集型的程序(如文件的读取,网络爬虫等),但是由于python的GIL(全局解释器锁)的存在,I/O密集型的程序,效率并没有很大的提升,尤其是python在多CPU中多线程的应用(原因是单核下多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低),所以python对于I/O密集型的任务使用协程可以有效的提升效率。
相关知识总结
为什么python多线程比较鸡肋
- 在python多线程下,每个线程的执行方式:
- 获取GIL
- 执行代码直到sleep或者是python虚拟机将其挂起
- 释放GIL
- 可见,某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。
- 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行)
多进程是什么,为什么不会像python多线程那样
- 多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。
- 每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行,所以在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)
- python在多核CPU下做并行的效率提升,通用的就是使用多进程,可以有效的提高并行效率
什么是阻塞
- 阻塞状态指陈程序未得到所需要的计算资源时暂时被挂起的,程序在等待某个操作完成的期间无法进行别的操作的状态。
- 常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。
什么是非阻塞
- 非阻塞状态指在程序等待某个操作的过程中自身不被阻塞,可以继续进行其他的操作。
- 非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。
什么是同步
- 不同的程序在完成某个任务,在执行过程中需要靠某种通信方式协调一致的模式
- 同步意味着有序
什么时异步
- 不同的程序在完成某个任务,在执行过程中无需通信协调完成任务,不相关的程序单元之间可以是异步的。
- 异步意味着无序
什么是协程
- 协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程
- 协程自身拥有寄存器上下文和栈。协程在调度切换时,将寄存器和栈保存到其他地方,在切换上下文时恢复到原来保存的寄存器上下文和栈。协程可以保存上一次调用的状态,在每次过程的重载时就是进入了上一次调用的状态。
- 协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。结合上面来看,除去python的GIL全局解释器锁的问题,协程的上下文切换是在一个进程中不用跨进程,这种模式就决定了它的高效性。
- 使用协程来实现异步操作,比如在网络爬虫场景下,当一个请求发出后如果有超时的情况,这个时候就不需要一直等待,在这段时间内程序还可以处理许多任务,在请求响应以后切换回来处理,这样可以极大的提升对CPU的利用率,异步协程可以极大的提升我们处理I/O密集型任务的效率。
python异步协程的用法
python从3.4版本开始引入了协程的概念,是以生成器对象为基础,操作比较繁琐麻烦,在3.5版本引入了在 async/await,是的协程的操作更加简单。这里学习使用python协程中最常用的库 asyncio,这种协程也是基于生成器的,还有一种Gevent,Gevent需要许多第三方库支持。得确保项目中用到其他用到的网络库也必须使用纯Python或者明确说明支持Gevent,由于Gevent直接修改标准库里面大部分的阻塞式系统调用,可能会出现意想不到的问题,所以用asyncio来实现异步协程。
基本概念
- event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。(这里可以理解为一个监听机制)
- coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。(方便开发者自己调控)
- task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
- future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别
- async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。
定义协程
只注入简单的coroutine协程对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import asyncio
async def execute(x): #定义一个协程,async定义的协程返回的是一个coroutine协程对象
print('Number: ',x)
coroutine = execute(1)
print('Coroutine: ',coroutine)
print('After calling execute')
loop = asyncio.get_event_loop() #创建事件循环loop
loop.run_until_complete(coroutine) #调用了 loop 对象的 run_until_complete() 方法将协程注册到事件循环 loop中,然后启动
print('After calling loop')
#运行结果:
Coroutine: <coroutine object execute at 0x000001C08233BEB8>
After calling execute
Number: 1
After calling loop将 coroutine 对象的进一步封装为task对象,它比 coroutine 对象多了运行状态,比如 running、finished 等,可以用这些状态来获取协程对象的执行情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import asyncio
async def execute(x):
print('Number:',x)
return x
coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine) #调用loop.create_task将async定义返回的coroutine对象进一步分装
print('Task:',task)
loop.run_until_complete(task) #调用了loop对象的run_until_complete()方法将协程注册到事件循环 loop中,然后启动
print('Task:',task)
print('After calling loop')
#运行结果:这里可以看到task对象的状态
Coroutine: <coroutine object execute at 0x00000259A21EAEB8>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/实战项目/Coroutie2.py:3>>
Number: 1
Task: <Task finished coro=<execute() done, defined at D:/python/实战项目/Coroutie2.py:3> result=1>
After calling loop不借助loop的create_task方法进一步封装task对象,通过asyncio 的 ensure_future() 方法直接定义task对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import asyncio
async def execute(x):
print('Number:',x)
return x
coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine) #直接定义task对象
print('Task:',task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task',task)
print('After calling loop')
#运行结果:和通过loop.create_task封装的结果一样
Coroutine: <coroutine object execute at 0x000002C99E56AEB8>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/实战项目/Coroutine3.py:3>>
Number: 1
Task <Task finished coro=<execute() done, defined at D:/python/实战项目/Coroutine3.py:3> result=1>
After calling loop
绑定回调
通过调用 add_done_callback() 为task对象绑定一个回调方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import asyncio
import requests
async def Request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:',task.result())
coroutine = Request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback) #绑定回调函数
print('Task:',task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
#运行结果:
Task: <Task pending coro=<Request() running at D:
Status: <Response [200]>
Task: <Task finished coro=<Request() done, defined at D:/python/实战项目/Coroutine4.py:4> result=<Response [200]>>不用回调方法,直接在 task 运行完毕之后也可以直接调用 result() 方法获取结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import asyncio
import requests
async def Request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutline = Request()
task = asyncio.ensure_future(coroutline)
print('Task:',task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('Task Result:',task.result())
#运行结果:
Task: <Task pending coro=<Request() running at D:
Status: <Response [200]>
Task: <Task finished coro=<Request() done, defined at D:/python/实战项目/Coroutine4.py:4> result=<Response [200]>>
多任务协程(通过task列表来实现,调用asncio.wait()方法执行)
1 | import asyncio |
协程实现
以上是对协程基本的定义使用,并没有进行异步处理的操作,这里用flask模拟网络请求有延迟的情况,然后进行挂起,异步的操作。
利用flask编写模拟有网络延迟的服务器
1
2
3
4
5
6
7
8
9
10
11from flask import Flask
import time
app = Flask(__name__)
def index():
time.sleep(3) #表示请求这个接口要延时3秒显示结果,模拟延迟的网络
return 'Hello!'
if __name__ == '__main__':
app.run(threaded = True) #threaded = True表示在服务器开启了多线程模式,如果不开启,服务端是单线程模式,如果还是异步I/O请求的方式,依旧要排队等待依次执行。通过协程的方式来请求生成的模拟服务端页面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import asyncio
import requests
import time
start = time.time()
async def Request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
response = requests.get(url)
print('Get response from ', url, 'Result: ', response.text)
tasks = [asyncio.ensure_future(Request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start)
#运行结果:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.040287971496582通过协程的方式请求页面发现还是在依次执行,时间为15s,效率并没有提高,原因是:协程提高效率的最大 特点是异步协程,重点是要能够异步请求,就需要程序可以将阻塞的协程挂起,让出控制权,执行其他协程。在python3.5及以后通过await实现挂起的操作,当loop循环事件遇到await是就执行协程挂起。
根据第三点的提出的特点,我们要实现挂起的是请求页面的操作,所以这里将请求页面的方法做一个修改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import asyncio
import requests
import time
start = time.time()
async def Request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
response = await requests.get(url)
print('Get response from ', url, 'Result: ', response.text)
tasks = [asyncio.ensure_future(Request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start)
#运行结果:
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Cost time: 15.033875942230225
Traceback (most recent call last):
File "D:/python/实战项目/Coroutine8.py", line 10, in Request
response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression通过运行结果来看,不能这样修改,Waiting for http这里确实是挂起了,但是await后面的对象不对,因为await后面必须跟一个原生 coroutine 对象或者一个由 types.coroutine() 修饰的生成器,这个生成器可以返回 coroutine 对象,而requests.get的生成的对象是:
,所以不符合 所以考虑将请求页面的方法单独封装,实现生成coroutine 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def Request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
response = await get(url)
print('Get response from ', url, 'Result: ', response.text)
tasks = [asyncio.ensure_future(Request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start)
#运行结果:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.035670042037964通过运行结果来看,将I/O操作的代码封装到 async 修饰的方法里面是不可行,无法实现异步请求,所以必须使用支持异步请求的方法来实现,这里使用aiohttp(时间缩短为原来的1/5,效率大大提高)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65#不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。
#每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。
import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
await session.close()
#这里的await是必须的,因为session的请求会话出现了可挂起的选项,如果close不可以挂起,会出现阻塞的状态导致报错
return result
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
result = await get(url)
print('Get response from',url,'Result:',result)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start)
# 运行结果:
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.0310966968536377
# 修改await session.close()为session.close()的报错执行结果
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.012549638748169
D:/python/Conoutine10.py:12: RuntimeWarning: coroutine 'ClientSession.close' was never session.close()
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000022C36EC9D68>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000022C36EC9EF0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000022C36EC9F28>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000022C36EC9F98>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000022C36EC9F60>通过几次代码的修改,我们成功的实现了真正的异步协程。开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第五个 task 的 session.get() 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,那咋办?只好等待了。3 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,3 秒!这里对应这我们模拟服务端设置的延时3秒。
根据上面显示的结果来看,5个task在3s之内都是挂起的等待的,同理,在3s之内CPU处理的task数量肯定会比5个(取决于CPU,我这里改成1000就会报错);所以就意味着在CPU允许的范围之内处理多少task的时间都是3s(前提是服务段抗压性好,能正确对每次请求做出正确的返回)。将5改为300来测试:task数量提高了60倍,处理时间还是3s,可以看到协程的优点;多出来的部分时间是IO 传输时延
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
await session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
result = await get(url)
print('Get response from',url,'Result:',result)
tasks = [asyncio.ensure_future(request()) for _ in range(300)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start)
与单进程、多进程、多线程的对比
单进程,单线程:(以10次请求为例)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36import requests
import time
start = time.time()
def Request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
result = requests.get(url).text
print('Get response from',url,'Result:',result)
for _ in range(10):
Request()
end = time.time()
print('Cost time:',end - start)
# 运行结果:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 30.057233095169067多进程(以10次请求为例)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44import requests
import time
import multiprocessing
start = time.time()
def Request(_):
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
result = requests.get(url).text
print('Get response from',url,'Result:',result)
if __name__ == '__main__':
cpu_count = multiprocessing.cpu_count() #cpu数目
print('Cpu count:',cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(Request,range(10))
end = time.time()
print('Cost time:',end - start)
#运行结果:
Cpu count: 4
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 10.076906442642212多线程(以10次请求为例):由于无法进行异步及GIL的存在,效率很低,上下文的切换导致开销,甚至时间比单线程更长一点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38import requests
import time
import threading
start = time.time()
def Request():
url = 'http://127.0.0.1:5000'
print('Waiting for',url)
result = requests.get(url).text
print('Get response from',url,'Result:',result)
for _ in range(10):
t = threading.Thread(target = Request())
t.start()
t.join()
end = time.time()
print('Cost time:',end - start)
# 运行结果:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 30.073291301727295
同时使用多进程和异步协程进行请求
异步协程和多进程对网络请求都有提升,将二者结合起来:Facebook 的 John Reese 介绍了 asyncio 和 multiprocessing 各自的特点,并开发了一个新的库,叫做 aiomultiprocess
安装aiomultiprocess
pip3 install aiomultiprocess
改写请求网络的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
urls = [url for _ in range(100)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
end = time.time()
print('Cost time:', end - start)
骚思路学习
requests实现异步爬虫一
如同前面介绍如何在asyncio中使用requests模块一样,如果想在asyncio中使用其他阻塞函数,该怎么实现呢?虽然目前有异步函数支持asyncio,但实际问题是大部分IO模块还不支持asyncio。 阻塞函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。
解决方案:这个问题的解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行,即可以通过run_in_executor方法来新建一个线程来执行耗时函数。
run_in_executor方法
1
2
3
4AbstractEventLoop.run_in_executor(executor, func, *args)
executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
func 就是要执行的函数
args 就是传递给 func 的参数实例(使用time.sleep())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print(e)
print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
#运行结果
start https://thief.one
start https://home.nmask.cn
start https://movie.nmask.cn
start https://tool.nmask.cn
stop https://home.nmask.cn
stop https://thief.one
stop https://tool.nmask.cn
stop https://movie.nmask.cn有了run_in_executor方法,我们就可以使用之前熟悉的模块创建协程并发了,而不需要使用特定的模块进行IO异步开发。
requests实现异步爬虫二
上面说了通过run_in_executor方法来添加线程的方式实现异步。在与之前学过的多线程、多进程相比,asyncio模块有一个非常大的不同:传入的函数不是随心所欲,所以这里以实现time.sleep的时的异步;以及修改函数为一个我们自己定义的函数。
实现time.sleep时的异步(run_in_executor开启了新的线程,再协调各个线程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49import asyncio
import time
start = time.time()
def myfun(i):
print('start {}th'.format(i))
time.sleep(1)
print('finish {}th'.format(i))
async def main():
loop = asyncio.get_event_loop()
futures = (
loop.run_in_executor(
None,
myfun,
i)
for i in range(10)
)
for result in await asyncio.gather(*futures):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print(end - start)
#运行结果:
start 0th
start 1th
start 2th
start 3th
start 4th
start 5th
start 6th
start 7th
start 8th
start 9th
finish 0th
finish 3th
finish 2th
finish 1th
finish 5th
finish 4th
finish 6th
finish 7th
finish 8th
finish 9th
1.008263111114502修改开启的线程数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import concurrent.futures as cf # 多加一个模块
import asyncio
import time
start = time.time()
def myfun(i):
print('start {}th'.format(i))
time.sleep(1)
print('finish {}th'.format(i))
async def main():
with cf.ThreadPoolExecutor(max_workers = 10) as executor: # 设置10个线程
loop = asyncio.get_event_loop()
futures = (
loop.run_in_executor(
executor, # 按照10个线程来执行
myfun,
i)
for i in range(10)
)
for result in await asyncio.gather(*futures):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print(end - start)修改函数为爬去豆瓣的电影标题(实际上没有出发异步)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import asyncio
import requests
from bs4 import BeautifulSoup
import time
start = time.time()
async def get_title(a):
url = 'https://movie.douban.com/top250?start={}&filter='.format(a*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
loop = asyncio.get_event_loop()
fun_list = (get_title(i) for i in range(10))
loop.run_until_complete(asyncio.gather(*fun_list))
end = time.time()
print(end - start)
#运行结果:(部分结果)
变脸
卡萨布兰卡
海盗电台
8.284837245941162修改用run_in_executor开启了新的线程触发异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import concurrent.futures as cf
import asyncio
import requests
from bs4 import BeautifulSoup
import time
start = time.time()
def get_title(i):
url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
lis = soup.find('ol', class_='grid_view').find_all('li')
for li in lis:
title = li.find('span', class_="title").text
print(title)
async def main():
with cf.ThreadPoolExecutor(max_workers = 10) as executor:
loop = asyncio.get_event_loop()
futures = (
loop.run_in_executor(
executor,
get_title,
i)
for i in range(10)
)
for result in await asyncio.gather(*futures):
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
print(end - start)
#运行结果:(部分)
当幸福来敲门
乱世佳人
怦然心动
0.8107953071594238通过对比可以看出python中协程可以极大的提升I/O操作的效率,在上述爬去豆瓣的250个电影标题中效率提升了一个数量级。
补充
第三方库uvloop可以对aiohttp加速,uvloop库基于libuv,也就是nodejs用的那个库。使用它也非常方便;有一个问题就是uvloop仅支持在Linux下的使用。
1
2
3
4 >import asyncio
>import uvloop
>asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
>
总结
本次通过学习python协程,对于python的高并发、异步编程做了较为全面的了解,同时对python的多线程、多进程、线程池、进程池等问题进行了回顾总结,提升编写工具的性能,目前来说满足了需求,通过本次学习,弥补了一些短板的地方。学习任重而道远!