使用fastapi和aiokafka使用kafka消息

wfauudbj  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1241)

我想创建一个api来使用fastapi使用kafka主题的消息。
我正在尝试用下面的代码创建一个入口点:

import asyncio
import logging
import json
from aiokafka import AIOKafkaConsumer
from fastapi import APIRouter, HTTPException

from app.config import (
    TOPIC_INGESTED_REQUEST,
    KAFKA_BOOTSTRAP_SERVER,
)

logger = logging.getLogger(__name__)

requests_router = r = APIRouter()

loop = asyncio.get_event_loop()

@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
    # define consumer
    consumer = AIOKafkaConsumer(
        TOPIC_INGESTED_REQUEST,
        loop=loop,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
        group_id=group_id,  # Consumer must be in a group to commit
        enable_auto_commit=True,  # Is True by default anyway
        auto_commit_interval_ms=1000,  # Autocommit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
    )

    # start consumer
    await consumer.start()

    retrieved_requests = []
    try:
        data = await consumer.getmany()
        for tp, messages in data.items():
            for message in messages:
                # Process message
                retrieved_requests.append({
                        "key": messages.key.decode("utf-8"),
                        "value": json.loads(message.value.decode("utf-8")),
                    })
    except Exception as e:
        logger.error(f"Error when trying to consume request for {group_id}: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        await consumer.stop()
    return retrieved_requests

我总是以空名单结束。我不明白为什么我不能从配置的主题获取消息。
我还尝试了aiokafka文档中的“经典”异步for循环。但也不管用。但是,使用经典循环,我可以看到创建了一个消费者,并对一些消息进行了消费。与 getmany 它总是空的。
谢谢!

w7t8yxp5

w7t8yxp51#

我终于设法找到了解决办法 getmanyaiokafka 消费者。
这样做的目的是尝试与消费者一起获取消息,并在没有(更多)消息时设置超时。我还设置了要检索的最大消息数,以防消息太多,但这应该是可选的。
代码如下:

requests_router = r = APIRouter()

loop = asyncio.get_event_loop()

def kafka_json_deserializer(serialized):
    return json.loads(serialized)

@r.get("/requests/{group_id}")
async def get_messages_from_kafka(group_id: str):
    """
    Consume a list of 'Requests' from 'TOPIC_INGESTED_REQUEST'.
    """
    consumer = AIOKafkaConsumer(
        TOPIC_INGESTED_REQUEST,
        loop=loop,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVER,
        group_id=group_id,  # unique identifier for each sidecar
        enable_auto_commit=True,
        auto_commit_interval_ms=1000,  # commit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
        value_deserializer=kafka_json_deserializer,
    )

    logger.info(
        f"Start consumer with group ID: '{group_id}' on topic '{TOPIC_INGESTED_REQUEST}'."
    )
    await consumer.start()
    logger.info("Consumer started.")

    retrieved_requests = []
    try:
        result = await consumer.getmany(
            timeout_ms=CONSUMER_TIMEOUT_MS, max_records=MAX_RECORDS_PER_CONSUMER
        )
        logger.info(f"Get {len(result)} messages in {TOPIC_INGESTED_REQUEST}.")
        for tp, messages in result.items():
            if messages:
                for message in messages:
                    retrieved_requests.append(
                        {"key": message.key.decode("utf-8"), "value": message.value,}
                    )
                # await consumer.commit({tp: messages[-1].offset + 1})
    except Exception as e:
        logger.error(
            f"Error when trying to consume request for {group_id} on topic {TOPIC_INGESTED_REQUEST}: {str(e)}"
        )
        raise HTTPException(status_code=500, detail=str(e))
    finally:
        await consumer.stop()

    return retrieved_requests

相关问题