详解python之协程gevent模块

时间:2021-05-22

Gevent官网文档地址:http://plete')

上例中,我们将超时设为2秒,此后所有协程的运行,如果超过两秒就会抛出”Timeout”异常。我们也可以将超时设置在with语句内,这样该设置只在with语句块中有效:

with Timeout(1): gevent.sleep(10)

此外,我们可以指定超时所抛出的异常,来替换默认的”Timeout”异常。比如下例中超时就会抛出我们自定义的”TooLong”异常。

class TooLong(Exception): passwith Timeout(1, TooLong): gevent.sleep(10)

协程间通信

事件(Event)对象

greenlet协程间的异步通讯可以使用事件(Event)对象。该对象的”wait()”方法可以阻塞当前协程,而”set()”方法可以唤醒之前阻塞的协程。在下面的例子中,5个waiter协程都会等待事件evt,当setter协程在3秒后设置evt事件,所有的waiter协程即被唤醒。

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronimport geventfrom gevent.event import Eventevt = Event()def setter(): print 'Wait for me' gevent.sleep(3) # 3秒后唤醒所有在evt上等待的协程 print "Ok, I'm done" evt.set() # 唤醒def waiter(): print "I'll wait for you" evt.wait() # 等待 print 'Finish waiting'gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter)])

AsyncResult事件

除了Event事件外,gevent还提供了AsyncResult事件,它可以在唤醒时传递消息。让我们将上例中的setter和waiter作如下改动:

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronfrom gevent.event import AsyncResultaevt = AsyncResult()def setter(): print 'Wait for me' gevent.sleep(3) # 3秒后唤醒所有在evt上等待的协程 print "Ok, I'm done" aevt.set('Hello!') # 唤醒,并传递消息def waiter(): print("I'll wait for you") message = aevt.get() # 等待,并在唤醒时获取消息 print 'Got wake up message: %s' % message

队列 Queue

队列Queue的概念相信大家都知道,我们可以用它的put和get方法来存取队列中的元素。gevent的队列对象可以让greenlet协程之间安全的访问。运行下面的程序,你会看到3个消费者会分别消费队列中的产品,且消费过的产品不会被另一个消费者再取到:

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaron<br>import geventfrom gevent.queue import Queueproducts = Queue()def consumer(name): #while not products.empty(): while True: try: print('%s got product %s' % (name, products.get_nowait())) gevent.sleep(0) except gevent.queue.Empty: break print('Quit')def producer(): for i in range(1, 10): products.put(i)gevent.joinall([ gevent.spawn(producer), gevent.spawn(consumer, 'steve'), gevent.spawn(consumer, 'john'), gevent.spawn(consumer, 'nancy'),])

注意:协程队列跟线程队列是一样的,put和get方法都是阻塞式的,它们都有非阻塞的版本:put_nowait和get_nowait。如果调用get方法时队列为空,则是不会抛出”gevent.queue.Empty”异常。我们只能使用get_nowait()的方式让气抛出异常。

信号量

信号量可以用来限制协程并发的个数。它有两个方法,acquire和release。顾名思义,acquire就是获取信号量,而release就是释放。当所有信号量都已被获取,那剩余的协程就只能等待任一协程释放信号量后才能得以运行:

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronimport geventfrom gevent.coros import BoundedSemaphoresem = BoundedSemaphore(2)def worker(n): sem.acquire() print('Worker %i acquired semaphore' % n) gevent.sleep(0) sem.release() print('Worker %i released semaphore' % n)gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])

上面的例子中,我们初始化了”BoundedSemaphore”信号量,并将其个数定为2。所以同一个时间,只能有两个worker协程被调度。程序运行后的结果如下:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 2 acquired semaphore
Worker 3 acquired semaphore
Worker 2 released semaphore
Worker 3 released semaphore
Worker 4 acquired semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

如果信号量个数为1,那就等同于同步锁。

协程本地变量

同线程类似,协程也有本地变量,也就是只在当前协程内可被访问的变量:

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronimport geventfrom gevent.local import localdata = local()def f1(): data.x = 1 print data.xdef f2(): try: print data.x except AttributeError: print 'x is not visible'gevent.joinall([ gevent.spawn(f1), gevent.spawn(f2)])

通过将变量存放在local对象中,即可将其的作用域限制在当前协程内,当其他协程要访问该变量时,就会抛出异常。不同协程间可以有重名的本地变量,而且互相不影响。因为协程本地变量的实现,就是将其存放在以的”greenlet.getcurrent()”的返回为键值的私有的命名空间内。

多并发socket模型

服务器端:

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronimport socketimport geventfrom gevent import socket, monkeymonkey.patch_all()def server(port): s = socket.socket() s.bind(('0.0.0.0', port)) s.listen(500) while True: cli, addr = s.accept() gevent.spawn(handle_request, cli)def handle_request(conn): try: while True: data = conn.recv(1024) print("recv:", data) conn.send(data) if not data: conn.shutdown(socket.SHUT_WR) except Exception as ex: print(ex) finally: conn.close()if __name__ == '__main__': server(8001)

当客户端连接上服务器端时,服务器端通过开辟一个协程与该客户端完成交互任务,同时由于使用了Gevent协程的方式,在每个客户端与服务器交互时,并不会影响到服务器端的工作。

客户端: 

#!/usr/bin/env python# _*_ coding utf-8 _*_#Author: aaronimport socketHOST = 'localhost' # The remote hostPORT = 8001 # The same port as used by the servers = socket.socket(socket.AF_INET, socket.SOCK_STREAM)s.connect((HOST, PORT))while True: msg = bytes(input(">>:"), encoding="utf8") s.sendall(msg) data = s.recv(1024) # print(data) print('Received', repr(data)) # repr 格式化输出s.close()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。

相关文章