使用asyncio执行异步任务

ioekq8ef  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(308)

我有一个python flask应用程序,它在kafka中推送任务,以便使用faust进行处理。我正在执行浮士德函数 send_data 就像 loop.run_until_complete(send_data()) 在我写的文件的顶端

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

我的问题是,我必须写下面的代码在每一页我有调用浮士德函数

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

代码库
主.py

import os
from flask import Flask
from page_views import call

os.environ.setdefault('FAUST_LOOP', 'eventlet')
app = Flask(__name__)

@app.route("/")
def hello():
  return call()

页面\u views.py

import faust
from faust.types import StreamT
import asyncio

app = faust.App(
  'page_views',
  broker='kafka://localhost:9092',
  origin='faust-app'
)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

class PageView(faust.Record):
  id: str
  user: str

page_view_topic = app.topic('page_view_topic', value_type=PageView)

# add table if didn't worked

page_views = app.Table('page_views', default=int)

@app.agent(page_view_topic)
async def count_page_views(views: StreamT[PageView]):
  async for view in views:
    print(view.id)

async def send_data():
  await count_page_views.send(value=PageView(id=100, user="sahilpaudel"))

def call():
  loop.run_until_complete(send_data())
  return "hello world"

像page_views.py一样,我需要调用其他异步任务和路由。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题