我建立了一个Kafka消费生产者系统,我需要处理传输的信息。这些是json文件中的行,比如
ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)
我正在寻找一个易于实施的解决方案
定义流窗口
分析窗口中的消息(计算唯一用户和类似对象的数量)
有人对如何进行有什么建议吗?谢谢。
我有问题使用Spark,所以我宁愿避免它。我正在用jupyter编写python脚本。
这是我的密码:
from kafka import KafkaConsumer
from random import randint
from time import sleep
bootstrap_servers = ['localhost:9092']
%store -r topicName # Get the topic name from the kafka producer
print topicName
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
auto_offset_reset='earliest'
)
consumer.subscribe([topicName])
for message in consumer:
print (message)
2条答案
按热度按时间3xiyfsfu1#
对于你的场景,Kafka流似乎是合适的。它支持以下4种类型的窗口:
对于python,有一个库:https://github.com/wintoncode/winton-kafka-streams
对你有用。
1qczuiv02#
使用Kafka流api是你需要的,我猜。你有所有的功能,你需要的窗口。您可以在此处找到有关Kafka流的更多信息:
https://kafka.apache.org/documentation/streams/