如何使kafkapython或pykafka作为一个异步生产者与uwsgi和gevent一起工作?

zphenhs4  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(378)

我的堆栈是带有gevents的uwsgi。我试图用一个decorator来 Package 我的api端点,以便将所有请求数据(url、方法、主体和响应)推送到kafka主题,但它不起作用。我的理论是因为我使用的是gevents,我试图在异步模式下运行它们,而实际推送到kafka的异步线程不能用gevents运行。如果我尝试使方法同步,那么它也不工作,它在produce worker中死亡,即在produce之后,调用永远不会返回。尽管这两种方法在pythonshell和在线程上运行uwsgi时效果都很好。
遵循示例代码:1。使用kafka python(异步)

try:
        kafka_producer = KafkaProducer(bootstrap_servers=KAFKAHOST.split(','))
    except NoBrokersAvailable:
        logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
        kafka_producer = None

    def send_message_to_kafka(topic, key, message):
        """
        :param topic: topic name
        :param key: key to decide partition
        :param message: json serializable object to send
        :return:
        """
        if not kafka_producer:
            logger.info(u'Kafka Host not available: {}'.format(KAFKAHOST))
            return
        data = json.dumps(message)
        try:
            start = time.time()
            kafka_producer.send(topic, key=str(key), value=data)
            logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
        except KafkaTimeoutError as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.info(e)
            pass
        except Exception as e:
            logger.info(u'Message not sent: {}'.format(KAFKAHOST))
            logger.exception(e)
            pass

与py kafka(同步):

try:
    client = KafkaClient(hosts=KAFKAHOST)
except Exception as e:
    logger.info(u'Kafka Host Not Found: {}'.format(KAFKAHOST))
    client = None

def send_message_to_kafka(topic, key, message):
    """
    :param topic: topic name
    :param key: key to decide partition
    :param message: json serializable object to send
    :return:
    """
    if not client:
        logger.info(u'Kafka Host is None')
        return
    data = json.dumps(message)
    try:
        start = time.time()
        topic = client.topics[topic]
        with topic.get_sync_producer() as producer:
            producer.produce(data, partition_key='{}'.format(key))
        logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
    except Exception as e:
        logger.exception(e)
        pass
bt1cpqcv

bt1cpqcv1#

我对皮Kafka有更多的经验,所以我可以回答这个问题。pykafka使用可插入线程处理程序,并且内置了gevent支持。您需要用 use_greenlets=True . 这里的文档
关于你的方法的其他想法。为每条消息创建一个新的主题对象和生产者是非常昂贵的。最好一次创建并重用。


# setup once

client = KafkaClient(hosts=KAFKAHOST, use_greenlets=True)
topic = client.topics[topic]
producer = topic.get_sync_producer() 

def send_message_to_kafka(producer, key, message):
    """
    :param producer: pykafka producer
    :param key: key to decide partition
    :param message: json serializable object to send
    :return:
    """

    data = json.dumps(message)
    try:
        start = time.time()
        producer.produce(data, partition_key='{}'.format(key))
        logger.info(u'Time take to push to Kafka: {}'.format(time.time() - start))
    except Exception as e:
        logger.exception(e)
        pass # for at least once delivery you will need to catch network errors and retry.

最后,Kafka从批处理和压缩中获得了所有的速度。使用sync producer可以防止客户端利用这些功能。它可以工作,但速度较慢,占用更多空间。有些应用程序需要同步,但如果遇到性能瓶颈,重新考虑应用程序以批处理消息可能是有意义的。

相关问题