flink有状态函数,用pythonsdk异步调用

voase2hg  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(486)

我正在用pythonsdk试用有状态函数2.1api,但我看不到一个明确的方法,即如何在不阻塞应用程序的情况下对外部api进行异步调用。
这是可能的还是有人能让我走上正确的道路?

6ovsh4lw

6ovsh4lw1#

从statefun 2.2.0开始,您可以使用asyncrequestreplyhandler。
从文档中:
pythonsdk附带了一个额外的处理程序asyncrequestreplyhandler,它支持python的可等待函数(协程)。这个处理程序可以用于异步python框架,例如aiohttp。

@functions.bind("example/hello")
async def hello(context, message):
   response = await compute_greeting(message)
   context.reply(response)

from aiohttp import web

handler = AsyncRequestReplyHandler(functions)

async def handle(request):
   req = await request.read()
   res = await handler(req)
   return web.Response(body=res, content_type="application/octet-stream")

app = web.Application()
app.add_routes([web.post('/statefun', handle)])

if __name__ == '__main__':
  web.run_app(app, port=5000)

你可以在这里看到一个完整的例子。

相关问题