分析来自kafka消费者的消息

bvjveswy  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(389)

我建立了一个Kafka消费生产者系统,我需要处理传输的信息。这些是json文件中的行,比如

ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)

我正在寻找一个易于实施的解决方案
定义流窗口
分析窗口中的消息(计算唯一用户和类似对象的数量)
有人对如何进行有什么建议吗?谢谢。
我有问题使用Spark,所以我宁愿避免它。我正在用jupyter编写python脚本。
这是我的密码:

from kafka import KafkaConsumer
from random import randint
from time import sleep

bootstrap_servers = ['localhost:9092']

%store -r topicName    # Get the topic name from the kafka producer
print topicName

consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
                         auto_offset_reset='earliest'
                        )
consumer.subscribe([topicName])

for message in consumer:
    print (message)
3xiyfsfu

3xiyfsfu1#

对于你的场景,Kafka流似乎是合适的。它支持以下4种类型的窗口:

Tumbling time window - Time-based   Fixed-size, non-overlapping, gap-less windows
Hopping time window- Time-based Fixed-size, overlapping windows
Sliding time window- Time-based Fixed-size, overlapping windows that work on differences between record timestamps
Session window

对于python,有一个库:https://github.com/wintoncode/winton-kafka-streams
对你有用。

1qczuiv0

1qczuiv02#

使用Kafka流api是你需要的,我猜。你有所有的功能,你需要的窗口。您可以在此处找到有关Kafka流的更多信息:
https://kafka.apache.org/documentation/streams/

相关问题