如何消费Kafka主题?

2o7dmzc5  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(130)

所以我的同事只是给了我关于主题的信息。我们有一个非常小的团队,我必须弄清楚如何消耗它。
下面是细节。
第一个月
MM_PROXY_STAGE_PASSWORD: xyz
bootstrap-servers: server1,server2,server3
Topics: topic1 / topic2
在这里我试着从互联网上看

import json
import kafka

def consume_data():
    consumer = kafka.KafkaConsumer(
        'topic1', 
        bootstrap_servers = 'localhost:9092',
        client_id = 'server1',
        key_deserializer = lambda k: k.decode('utf-8'),
        value_deserializer = lambda v: json.loads(v.decode('utf-8'))
    )
    
    for message in consumer:
        print(message)
        
if __name__ == '__main__':
    consume_data()

字符串
这段代码一直在运行,什么也没发生。

jv4diomz

jv4diomz1#

您可能错过了身份验证和凭据配置步骤。要从Kafka使用数据,需要将凭证(用户名和密码)传递给KafkaConsumer对象。将这些添加到代码中:

import json
from kafka import KafkaConsumer

def consume_data():
    consumer = KafkaConsumer(
        'topic1',
        bootstrap_servers='server1:9092,server2:9092,server3:9092',
        security_protocol='SASL_PLAINTEXT',
        sasl_mechanism='PLAIN',
        sasl_plain_username='xyz',
        sasl_plain_password='xyz',
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )

    for message in consumer:
        print(message.value)

if __name__ == '__main__':
    consume_data()

字符串
将“server1:9092,server2:9092,server3:9092”替换为提供的实际引导服务器。另外,将'xyz'更改为正确的凭据(MM_PROXY_STAGE_USERNAME和MM_PROXY_STAGE_PASSWORD)

相关问题