Web Services 使tornado在单独的线程上为请求提供服务

ql3eal8s  于 2022-11-15  发布在  其他
关注(0)|答案(5)|浏览(158)

我有一个用Flask编写的Web服务, Package 在WSGIContainer中,由Tornado使用其FallbackHandler机制提供服务。我在flask Web服务中的一个路由运行了一个非常长的操作(大约需要5分钟才能完成),当此路由被触发时,对任何路由的所有其他调用都将被阻止,直到操作完成。我如何解决此问题?
以下是使用Tornado处理Flask应用程序的方式:

parse_command_line()

    frontend_path = os.path.join(os.path.dirname(__file__),"..","webapp")

    rest_app = WSGIContainer(app)
    tornado_app = Application(
        [
            (r"/api/(.*)", FallbackHandler, dict(fallback=rest_app)),
            (r"/app/(.*)", StaticFileHandler, dict(path=frontend_path))
        ]
    )
frebpwbc

frebpwbc1#

我创建了一个自定义的WSGIHandler,它通过使用ThreadPoolExecutor支持Tornado中WSGI应用的多线程请求。所有对WSGI应用的调用都在单独的线程中执行,因此即使WSGI响应花费很长时间,主循环也保持空闲。以下代码基于this Gist并进行了扩展,以便:

  • 您可以将响应(使用迭代器响应)或大文件直接从WSGI应用程序流到客户机,这样即使在生成大响应时也可以保持较低的内存使用率。
  • 如果请求正文超过1 MB,整个请求正文将被转储到一个临时文件中,然后传递到WSGI应用程序。

目前,代码只在Python 3.4中测试过,所以我不知道它是否能在Python 2.7中工作。它也还没有进行压力测试,但到目前为止似乎工作得很好。

# tornado_wsgi.py

import itertools
import logging
import sys
import tempfile
from concurrent import futures
from io import BytesIO

from tornado import escape, gen, web
from tornado.iostream import StreamClosedError
from tornado.wsgi import to_wsgi_str

_logger = logging.getLogger(__name__)

@web.stream_request_body
class WSGIHandler(web.RequestHandler):
    thread_pool_size = 20

    def initialize(self, wsgi_application):
        self.wsgi_application = wsgi_application

        self.body_chunks = []
        self.body_tempfile = None

    def environ(self, request):
        """
        Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
        """
        hostport = request.host.split(":")
        if len(hostport) == 2:
            host = hostport[0]
            port = int(hostport[1])
        else:
            host = request.host
            port = 443 if request.protocol == "https" else 80

        if self.body_tempfile is not None:
            body = self.body_tempfile
            body.seek(0)
        elif self.body_chunks:
            body = BytesIO(b''.join(self.body_chunks))
        else:
            body = BytesIO()

        environ = {
            "REQUEST_METHOD": request.method,
            "SCRIPT_NAME": "",
            "PATH_INFO": to_wsgi_str(escape.url_unescape(request.path, encoding=None, plus=False)),
            "QUERY_STRING": request.query,
            "REMOTE_ADDR": request.remote_ip,
            "SERVER_NAME": host,
            "SERVER_PORT": str(port),
            "SERVER_PROTOCOL": request.version,
            "wsgi.version": (1, 0),
            "wsgi.url_scheme": request.protocol,
            "wsgi.input": body,
            "wsgi.errors": sys.stderr,
            "wsgi.multithread": False,
            "wsgi.multiprocess": True,
            "wsgi.run_once": False,
        }
        if "Content-Type" in request.headers:
            environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
        if "Content-Length" in request.headers:
            environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
        for key, value in request.headers.items():
            environ["HTTP_" + key.replace("-", "_").upper()] = value
        return environ

    def prepare(self):
        # Accept up to 2GB upload data.
        self.request.connection.set_max_body_size(2 << 30)

    @gen.coroutine
    def data_received(self, chunk):
        if self.body_tempfile is not None:
            yield self.executor.submit(lambda: self.body_tempfile.write(chunk))
        else:
            self.body_chunks.append(chunk)

            # When the request body grows larger than 1 MB we dump all receiver chunks into
            # a temporary file to prevent high memory use. All subsequent body chunks will
            # be directly written into the tempfile.
            if sum(len(c) for c in self.body_chunks) > (1 << 20):
                self.body_tempfile = tempfile.NamedTemporaryFile('w+b')
                def copy_to_file():
                    for c in self.body_chunks:
                        self.body_tempfile.write(c)
                    # Remove the chunks to clear the memory.
                    self.body_chunks[:] = []
                yield self.executor.submit(copy_to_file)

    @web.asynchronous
    @gen.coroutine
    def get(self):
        data = {}
        response = []

        def start_response(status, response_headers, exc_info=None):
            data['status'] = status
            data['headers'] = response_headers
            return response.append

        environ = self.environ(self.request)
        app_response = yield self.executor.submit(self.wsgi_application, environ, start_response)
        app_response = iter(app_response)

        if not data:
            raise Exception('WSGI app did not call start_response')

        try:
            exhausted = object()

            def next_chunk():
                try:
                    return next(app_response)
                except StopIteration:
                    return exhausted

            for i in itertools.count():
                chunk = yield self.executor.submit(next_chunk)
                if i == 0:
                    status_code, reason = data['status'].split(None, 1)
                    status_code = int(status_code)
                    headers = data['headers']
                    self.set_status(status_code, reason)
                    for key, value in headers:
                        self.set_header(key, value)
                    c = b''.join(response)
                    if c:
                        self.write(c)
                        yield self.flush()
                if chunk is not exhausted:
                    self.write(chunk)
                    yield self.flush()
                else:
                    break
        except StreamClosedError:
            _logger.debug('stream closed early')
        finally:
            # Close the temporary file to make sure that it gets deleted.
            if self.body_tempfile is not None:
                try:
                    self.body_tempfile.close()
                except OSError as e:
                    _logger.warning(e)

            if hasattr(app_response, 'close'):
                yield self.executor.submit(app_response.close)

    post = put = delete = head = options = get

    @property
    def executor(self):
        cls = type(self)
        if not hasattr(cls, '_executor'):
            cls._executor = futures.ThreadPoolExecutor(cls.thread_pool_size)
        return cls._executor

下面是一个简单的Flask应用程序,它演示了WSGIHandlerhello()函数阻塞一秒钟,因此如果您的ThreadPoolExecutor使用20个线程,您将能够同时加载20个请求(在一秒钟内)。
stream()函数创建一个迭代器响应,并在5秒内将50个数据块传输到客户端。由于迭代器的每次加载都会产生一个新的executor.submit(),因此很可能来自流响应的不同块将从不同的线程加载,从而破坏Flask对线程局部变量的使用。

import time
from flask import Flask, Response
from tornado import ioloop, log, web
from tornado_wsgi import WSGIHandler

def main():
    app = Flask(__name__)

    @app.route("/")
    def hello():
        time.sleep(1)
        return "Hello World!"

    @app.route("/stream")
    def stream():
        def generate():
            for i in range(50):
                time.sleep(0.1)
                yield '%d\n' % i
        return Response(generate(), mimetype='text/plain')

    application = web.Application([
        (r'/.*', WSGIHandler, {'wsgi_application': app}),
    ])

    log.enable_pretty_logging()
    application.listen(8888)
    ioloop.IOLoop.instance().start()

if __name__ == '__main__':
    main()
mxg2im7a

mxg2im7a2#

Tornado的WSGI容器的可伸缩性不是很好,只有当你有特殊的理由在同一个进程中合并WSGI和Tornado应用程序时才应该使用。任何可能需要花费较长时间的操作都需要使用Tornado的本地异步接口而不是WSGI。
参见文档中的警告:
WSGI是一个同步接口,而Tornado的并发模型基于单线程异步执行。这意味着使用Tornado的WSGIContainer运行WSGI应用的可伸缩性低于在多线程WSGI服务器(如gunicorn或uwsgi)中运行相同的应用。仅当在同一进程中结合Tornado和WSGI的好处超过可伸缩性降低时,才使用WSGIContainer。

sr4lhrrt

sr4lhrrt3#

您可以考虑使用tornado-threadpool,在这种情况下,您的请求将立即返回,任务将在后台完成。

from thread_pool import in_thread_pool
from flask import flash

@app.route('/wait')
def wait():
    time_consuming_task()
    flash('Time consuming task running in backround...')
    return render_template('index.html')

@in_thread_pool
def time_consuming_task():
    import time
    time.sleep(5)
pgx2nnw8

pgx2nnw84#

你可以使用Ladon的任务型方法来处理这些长时间的操作。
它为这些类型的情况提供了框架解决方案。
Ladon Tasks documentation

pcrecxhr

pcrecxhr5#

当龙卷风与Flask一起工作时,请查看WSGIContainer模型的源代码,并且...请查看下面的示例代码!!!

from concurrent.futures import ThreadPoolExecutor
import tornado.gen
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado import escape
from tornado import httputil
from typing import List, Tuple, Optional, Callable, Any, Dict
from types import TracebackType

__all__ = ("WSGIContainer_With_Thread",)

class WSGIContainer_With_Thread(WSGIContainer):

    executor = ThreadPoolExecutor(30)

    @tornado.gen.coroutine
    def __call__(self, request):
        data = {}  # type: Dict[str, Any]
        response = []  # type: List[bytes]

        def start_response(
                status: str,
                _headers: List[Tuple[str, str]],
                exc_info: Optional[
                    Tuple[
                        "Optional[Type[BaseException]]",
                        Optional[BaseException],
                        Optional[TracebackType],
                    ]
                ] = None,
        ) -> Callable[[bytes], Any]:
            data["status"] = status
            data["headers"] = _headers
            return response.append

        loop = tornado.ioloop.IOLoop.instance()
        app_response = yield loop.run_in_executor(
            self.executor, self.wsgi_application, WSGIContainer.environ(request), start_response
        )

        # --*-- put this into some executor --*--
        # app_response = self.wsgi_application(
        #     WSGIContainer.environ(request), start_response
        # )
        # --*-- put this into some executor --*--

        try:
            response.extend(app_response)
            body = b"".join(response)
        finally:
            if hasattr(app_response, "close"):
                app_response.close()  # type: ignore
        if not data:
            raise Exception("WSGI app did not call start_response")

        status_code_str, reason = data["status"].split(" ", 1)
        status_code = int(status_code_str)
        headers = data["headers"]  # type: List[Tuple[str, str]]
        header_set = set(k.lower() for (k, v) in headers)
        body = escape.utf8(body)
        if status_code != 304:
            if "content-length" not in header_set:
                headers.append(("Content-Length", str(len(body))))
            if "content-type" not in header_set:
                headers.append(("Content-Type", "text/html; charset=UTF-8"))
        if "server" not in header_set:
            headers.append(("Server", "TornadoServer/%s" % tornado.version))

        start_line = httputil.ResponseStartLine("HTTP/1.1", status_code, reason)
        header_obj = httputil.HTTPHeaders()
        for key, value in headers:
            header_obj.add(key, value)
        assert request.connection is not None
        request.connection.write_headers(start_line, header_obj, chunk=body)
        request.connection.finish()
        self._log(status_code, request)

if __name__ == '__main__':

    from flask import Flask
    import time
    from tornado.ioloop import IOLoop

    app = Flask(__name__)

    @app.route('/1')
    def index1():
        time.sleep(5)
        return f'OK 1 - {int(time.time())}'

    @app.route('/2')
    def index2():
        time.sleep(5)
        return f'OK 2 - {int(time.time())}'

    @app.route('/3')
    def index3():
        return f'OK 3 - {int(time.time())}'

    http_server = HTTPServer(WSGIContainer_With_Thread(app))
    http_server.listen(5000)
    IOLoop.instance().start()

运行此示例时,tornado应用程序的侦听速率为5000,我们可以进行一些测试:
1.请求路由'/1'请求路由'/2'同时发出,并且您应该同时获得响应(均在5秒内)
1.同时
请求路由'/1'请求路由'/3'
,您应该立即从路由'/3'获得响应,并在5秒内从路由'/1'获得响应
1.同时请求路由'/1'请求路由'/1'(如在不同的浏览器选项卡中),您应该在5秒内从路由'/1'获得第一个响应,并在10秒内从路由'/1'获得第二个响应

相关问题