所以我的同事只是给了我关于主题的信息。我们有一个非常小的团队,我必须弄清楚如何消耗它。
下面是细节。
第一个月MM_PROXY_STAGE_PASSWORD: xyz
个bootstrap-servers: server1,server2,server3
个Topics: topic1 / topic2
个
在这里我试着从互联网上看
import json
import kafka
def consume_data():
consumer = kafka.KafkaConsumer(
'topic1',
bootstrap_servers = 'localhost:9092',
client_id = 'server1',
key_deserializer = lambda k: k.decode('utf-8'),
value_deserializer = lambda v: json.loads(v.decode('utf-8'))
)
for message in consumer:
print(message)
if __name__ == '__main__':
consume_data()
字符串
这段代码一直在运行,什么也没发生。
1条答案
按热度按时间jv4diomz1#
您可能错过了身份验证和凭据配置步骤。要从Kafka使用数据,需要将凭证(用户名和密码)传递给
KafkaConsumer
对象。将这些添加到代码中:字符串
将“server1:9092,server2:9092,server3:9092”替换为提供的实际引导服务器。另外,将'xyz'更改为正确的凭据(MM_PROXY_STAGE_USERNAME和MM_PROXY_STAGE_PASSWORD)