用例:我有消息具有messageid,多个消息可以具有相同的消息id,这些消息存在于按messageid分区的流管道(如kafka)中,因此我确保所有具有相同messageid的消息都将进入同一分区。
所以我需要写一个作业,它应该缓冲消息一段时间(比如说1分钟),然后,将所有具有相同messageid的消息合并到一个大消息中。
我认为可以使用spark数据集和spark sql(或其他什么?)来完成。但是我找不到任何关于如何为给定的消息id存储消息一段时间,然后对这些消息进行聚合的示例/文档。
用例:我有消息具有messageid,多个消息可以具有相同的消息id,这些消息存在于按messageid分区的流管道(如kafka)中,因此我确保所有具有相同messageid的消息都将进入同一分区。
所以我需要写一个作业,它应该缓冲消息一段时间(比如说1分钟),然后,将所有具有相同messageid的消息合并到一个大消息中。
我认为可以使用spark数据集和spark sql(或其他什么?)来完成。但是我找不到任何关于如何为给定的消息id存储消息一段时间,然后对这些消息进行聚合的示例/文档。
1条答案
按热度按时间ar7v8xwq1#
我想你要找的是Spark流。spark有一个kafka连接器,可以链接到spark流上下文。
下面是一个非常基本的示例,它将在1分钟的时间间隔内为给定主题集中的所有消息创建rdd,然后按消息id字段对它们进行分组(您的值序列化程序必须公开这样一个
getMessageId
方法,当然)。在流式api中还有其他几种方法可以对消息进行分组。有关更多示例,请参阅文档。