华企号 后端开发 异步非阻塞实现方式

异步非阻塞实现方式

以下内容根据自己实操和理解进行的整理,欢迎交流~

在tornado的开发中,我们一般会见到以下四个组成部分。

  • ioloop:

同一个ioloop实例运行在一个单线程环境下。

tornado.ioloop.IOLoop.current().start()
  • app

可以有多个app,一般使用一个。会挂接一个或多个服务端套接字端口对外提供服务。

app = tornado.web.Application([
                                (r"/api/predict", PredictHandler),
                                (....) 
                                ], debug=False
                                )
  • 路由表

将服务url与handler对应起来,形成一个路由映射表。当请求到来时,根据请求的访问url查询路由映射表来找到相应的业务handler。

[
    (r"/api/predict", PredictHandler),
    (....) 
]
  • handler

开发时编写的业务逻辑。可以有多个handler,为了可以通过不同的url访问handler,增加一个handler就需要增加一个路由寻址。

class Handler_1(RequestHandler):
    def get(self, *args, **kwargs):
                time.sleep(5)
                self.write("done")

class Handler_2(RequestHandler):
    def get(self, *args, **kwargs):
                time.sleep(5)
                self.write("done")

app = tornado.web.Application([
                            (r"/api/predict_1", Handler_1),
                            (r"/api/predict_2", Handler_2)
                            ], debug=False)

tornado的请求处理流程

  1. 当一个请求到来时,ioloop读取这个请求,解包成一个http请求对象;
  2. tornado找到该请求对象中对应app的路由表,通过路由表查询挂接的handler;
  3. 执行handler。handler方法执行后一般会返回一个对象;
  4. ioloop负责将对象包装成http响应对象序列化发送给客户端;
异步非阻塞实现方式

异步

  • 什么是异步

    tornado的异步非阻塞是针对另一请求来说的,本次的请求如果没有执行完,那依然需要继续执行。

    python代码的异步和tornado的异步差异?

    Python里面的异步是针对代码段来讲,异步的代码段相当于放进后台执行,CPU接着执行异步代码段下一行的代码,这样可以充分利用CPU,毕竟IO的速度可比不上CPU的运行速度,一直等着IO结束多浪费时间。

  • 为什么需要异步?

    提高CPU利用和服务并发

    在传统的同步web服务器中,网络为了实现多并发的功能,就需要和每一个用户保持长连接,这就需要向每一个用户分配一个线程,这是非常昂贵的资源支出。而在进行IO操作时,CPU是处于闲置的状态。

    为了解决上述问题,tornado采用了单线程事件循环,减少并发连接的成本。一次只有一个线程在工作。要想用单线程实现并发,这就要求应用程序是异步非阻塞的。

    需要注意的是tornado的高性能源于Tornado基于Epoll的异步网络IO。但是因为tornado的单线程机制,很容易写出阻塞服务(block)的代码。不但没有性能提高,反而会让性能急剧下降。

  • tornado实现异步的方式

    在一个tornado请求之内,需要做一个I/O耗时的任务。直接写在业务逻辑里可能会block整个服务(也就是其他请求无法访问此服务)。因此可以把这个任务放到异步处理,实现异步的方式就有两种,一种是yield挂起函数,另外一种就是使用类线程池的方式。

    一般网上都会说‘yield生成器方式’,‘使用协程方法的异步非阻塞’,但是都没有提及到特别重要的一点,那就是yield挂起的函数必须是非阻塞函数。如果写了使用了异步方法,但是写了阻塞函数,那么处理请求的方式仍然是同步阻塞的。

同步阻塞:

并发请求多路由地址:

  1. 若handler里面没有耗时IO操作,则会立马返回,多个并行访问感觉上是并行的,实际上由于tornado单线程事件循环机制,实际上是串行处理请求。
  2. 若handler里面有耗时IO操作,不会立马返回,会阻塞在耗时IO操作里面;这时并发请求会阻塞住(因为在排队),迟迟返不回结果。

产生上述的原因是由于taonado的单线程事件循环,每次只有一个线程执行操作。如果线程正在处理阻塞函数,就不能重新获得一个连接,处理并发的请求。

所以tornado要求里面的业务逻辑是异步非阻塞。

异步非阻塞——协程

协程/生成器

tornado推荐使用协程实现异步的方法。python 关键字 yield来实现异步。

Tornado的异步条件:要使用到异步,就必须把IO操作变成非阻塞的IO。这一点非常重要,否则就达不到异步的效果。通过异步,可以释放线程,线程从连接队列获取一个新的连接请求,从而可以处理其他请求。

当采用协程+非阻塞函数进行异步处理时,不管这个IO操作是否有返回结果,当前路由不会跳过耗时函数执行下一行代码。前面说过了,tornado的异步与python的异步不是一回事,或者说针对的对象不一样。但是线程不会一直干等着,在等待的时候可以干别的事情,于是就去重新连接了一个请求,与新的请求打的火热。原来的IO操作执行完毕了,会通知线程返回继续执行下一行代码,

此种方式的严重缺点:

使用 coroutine 方式严重依赖第三方库(需要支持异步)的实现,如果库本身不支持 Tornado 的异步操作,再怎么使用协程也依然会是阻塞的;或者可以参考内置异步客户端,借助tornado.ioloop.IOLoop封装一个自己的异步客户端,但开发成本太高。

基于协程的编程

class SleepHandler(BaseHandler):
    """
    异步的延时10秒
    """
    @gen.coroutine
    def get(self):
        yield gen.sleep(10) # 这里必须是异步函数,I/O操作,否则仍然会阻塞
        self.write("when i sleep 5s")

对于不支持异步的耗时操作,如何使服务不阻塞,可以继续处理其他请求呢?

那就是:基于线程的异步编程

异步非阻塞——线程池异步

由于python解释器使用GIL,多线程只能提高IO的并发能力,不能提高计算的并发能力。因此可以考虑通过子进程的方式,适当增加提供服务的进程数,提高整个系统服务能力的上限

基于线程池的方式,能让tornado的阻塞过程变成非阻塞,其原理是在tornado本身这个线程之外启动一个线程执行阻塞程序,从而变成非阻塞。

线程池为RequestHandler持有,请求处理逻辑中的耗时/阻塞任务可以提交给线程池处理,主循环逻辑可以继续处理其他请求,线程池内的任务处理完毕后,会通过回调注册callback到ioloop,ioloop可以通过执行callback恢复挂起的请求处理逻辑。

需要添加的代码:

  1. 创建线程池:executor = ThreadPoolExecutor(10)
  2. @tornado.gen.coroutine # 使用协程调度 + yield
  3. @tornado.concurrent.run_on_executor

优点:

异步非阻塞服务

小负载的工作,可以起到很好的效果

缺点:

如果大量使用线程化的异步函数做一些高负载的活动,会导致Tornado进程性能低下响应缓慢;

from concurrent.futures import ThreadPoolExecutor

class Executor(ThreadPoolExecutor):
    """ 单例模式
    """
    _instance = None
    def __new__(cls, *args, **kwargs):
        if not getattr(cls, '_instance', None):
            thred_num = 10 # 线程池数量
            cls._instance = ThreadPoolExecutor(max_workers=thred_num)
        return cls._instance

class PredictHandler(RequestHandler):
        # 1.使用单例模式
    executor = Executor()
        # 2.直接创建
        # executor = ThreadPoolExecutor(10)

        @tornado.gen.coroutine  # 使用协程调度
    def get(self, *args, **kwargs):
            result = yield self.main_process(url)
                self.write(....)

        @tornado.concurrent.run_on_executor 
    def main_process(self,url):
                #  do something
                # 会让tornado阻塞的行为
        return sa_result

def create_app():
    return tornado.web.Application([
        (r"/api/predict", PredictHandler),
    ], debug=False) # 开启多进程后,一定要将 debug 设置为 False

app = create_app()
app.listen(8501)
tornado.ioloop.IOLoop.current().start()

如果函数做的是高负载该怎么办?

使用:Tornado 结合 Celery

Tornado 结合 Celery tornado-celery

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列, 同时也支持任务调度。

它是一个分布式的实时处理消息队列调度系统,tornado接到请求后,可以把所有的复杂业务逻辑处理、数据库操作以及IO等各种耗时的同步任务交给celery,由这个任务队列异步处理完后,再返回给tornado。这样只要保证tornado和celery的交互是异步的,那么整个服务是完全异步的。

作者: 华企网通王鹏程序员

我是程序员王鹏,热爱互联网软件开发和设计,专注于大数据、数据分析、数据库、php、java、python、scala、k8s、docker等知识总结。 我的座右铭:"业精于勤荒于嬉,行成于思毁于随"
上一篇
下一篇

发表回复

联系我们

联系我们

028-84868647

在线咨询: QQ交谈

邮箱: tech@68v8.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部