如何用Druid宁静(超集)读取Kafka通道的数据?

nxagd54h  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(425)

在ubuntu服务器上,我设置了divolte collector从网站收集clickstream数据。数据正在写入一个名为divolte data的kafka通道。通过设置Kafka消费者,我可以看到以下数据:

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.

然后我想用airbnb超集来可视化数据,它有几个连接到常用数据库的连接器,包括druid.io(它可以读取spark)。
divolte似乎在以非结构化的方式在kafka中存储数据。但显然,它可以以结构化的方式Map数据。输入数据是否应该用json(如docs所说)结构化?
然后如何从Druid宁静读取Kafka通道接收到的数据?我在conf示例中尝试更改通道名称,但是这个消费者随后收到的消息为零。

x8goxv8g

x8goxv8g1#

我发现的解决方案是我可以用python处理kafka消息,例如用kafka python库,或者合流kafka python,然后我将用avro阅读器解码这些消息。
编辑:以下是我如何做到这一点的更新:
我以为avro库只是读取avro文件,但它实际上解决了解码kafka消息的问题,如下所示:我首先导入库并将schema文件作为参数,然后创建一个函数将消息解码到字典中,我可以在consumer循环中使用。

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
while running:
    msg = c.poll()
    if not msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

相关问题