在ubuntu服务器上,我设置了divolte collector从网站收集clickstream数据。数据正在写入一个名为divolte data的kafka通道。通过设置Kafka消费者,我可以看到以下数据:
V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.
然后我想用airbnb超集来可视化数据,它有几个连接到常用数据库的连接器,包括druid.io(它可以读取spark)。
divolte似乎在以非结构化的方式在kafka中存储数据。但显然,它可以以结构化的方式Map数据。输入数据是否应该用json(如docs所说)结构化?
然后如何从Druid宁静读取Kafka通道接收到的数据?我在conf示例中尝试更改通道名称,但是这个消费者随后收到的消息为零。
1条答案
按热度按时间x8goxv8g1#
我发现的解决方案是我可以用python处理kafka消息,例如用kafka python库,或者合流kafka python,然后我将用avro阅读器解码这些消息。
编辑:以下是我如何做到这一点的更新:
我以为avro库只是读取avro文件,但它实际上解决了解码kafka消息的问题,如下所示:我首先导入库并将schema文件作为参数,然后创建一个函数将消息解码到字典中,我可以在consumer循环中使用。