如何在consumerrecord中获取字段

yrdbyhpb  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(575)

我写了一个python脚本:


# !/usr/bin/env python

from kafka import KafkaConsumer
consumer = KafkaConsumer('dimon_tcpdump',group_id='zhg_group',bootstrap_servers='192.168.100.9:9092')
for msg in consumer:
    print msg
    # process mes here

消息输出如下:

ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')

我知道输出是一个命名的双重形式。
我的问题是:如何获得 ConsumerRecord ? 例如,我想将值字符串赋给一个变量。

vhmi4jdf

vhmi4jdf1#

正如你所知 msg 属于类型 namedtuple ,只需通过属性查找即可访问其字段,例如:

for msg in consumer:
    value_to_process = msg.value

在comment部分,您似乎在重新分配 msgmsg 签署人:

msg = r'''ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')'''

所以你才会 AttributeError :此时 msg 被覆盖并成为 str 反对,但是 str 没有 value 属性。

rqenqsqc

rqenqsqc2#

这可能与反序列化数据的方式有关。例如,如果您想从 msg . 您将初始化 Consumer 使用: value_deserializer=lambda m: json.loads(m.decode('utf-8')) 所以你的代码看起来像这样:


# !/usr/bin/env python

from kafka import KafkaConsumer
consumer = KafkaConsumer(
   'dimon_tcpdump',
    group_id='zhg_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    bootstrap_servers='192.168.100.9:9092'
    )
for msg in consumer:
    print msg.value
    # process mes here

相关问题