我想创建一个api来使用fastapi使用kafka主题的消息。
我正在尝试用下面的代码创建一个入口点:
import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException
from app.config import (
TOPIC_INGESTED_REQUEST,
KAFKA_BOOTSTRAP_SERVER,
)
logger = logging.getLogger(__name__)
requests_router = r = APIRouter()
loop = asyncio.get_event_loop()
@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
# define consumer
consumer = AIOKafkaConsumer(
TOPIC_INGESTED_REQUEST,
loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
group_id=group_id, # Consumer must be in a group to commit
enable_auto_commit=True, # Is True by default anyway
auto_commit_interval_ms=1000, # Autocommit every second
auto_offset_reset="earliest", # If committed offset not found, start from beginning
)
# start consumer
await consumer.start()
retrieved_requests = []
try:
data = await consumer.getmany()
for tp, messages in data.items():
for message in messages:
# Process message
retrieved_requests.append({
"key": messages.key.decode("utf-8"),
"value": json.loads(message.value.decode("utf-8")),
})
except Exception as e:
logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
finally:
await consumer.stop()
return retrieved_requests
我总是以空名单结束。我不明白为什么我不能从配置的主题获取消息。
我还尝试了aiokafka文档中的“经典”异步for循环。但也不管用。但是,使用经典循环,我可以看到创建了一个消费者,并对一些消息进行了消费。与 getmany
它总是空的。
谢谢!
1条答案
按热度按时间w7t8yxp51#
我终于设法找到了解决办法
getmany
从aiokafka
消费者。这样做的目的是尝试与消费者一起获取消息,并在没有(更多)消息时设置超时。我还设置了要检索的最大消息数,以防消息太多,但这应该是可选的。
代码如下: