在流数据中为给定的messageid缓冲消息

js4nwp54  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(374)

用例:我有消息具有messageid,多个消息可以具有相同的消息id,这些消息存在于按messageid分区的流管道(如kafka)中,因此我确保所有具有相同messageid的消息都将进入同一分区。
所以我需要写一个作业,它应该缓冲消息一段时间(比如说1分钟),然后,将所有具有相同messageid的消息合并到一个大消息中。
我认为可以使用spark数据集和spark sql(或其他什么?)来完成。但是我找不到任何关于如何为给定的消息id存储消息一段时间,然后对这些消息进行聚合的示例/文档。

ar7v8xwq

ar7v8xwq1#

我想你要找的是Spark流。spark有一个kafka连接器,可以链接到spark流上下文。
下面是一个非常基本的示例,它将在1分钟的时间间隔内为给定主题集中的所有消息创建rdd,然后按消息id字段对它们进行分组(您的值序列化程序必须公开这样一个 getMessageId 方法,当然)。

SparkConf conf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.minutes(1));

Map<String, Object> params = new HashMap<String, Object>() {{
    put("bootstrap.servers", kafkaServers);
    put("key.deserializer", kafkaKeyDeserializer);
    put("value.deserializer", kafkaValueDeserializer);
}};

List<String> topics = new ArrayList<String>() {{
    // Add Topics
}};

JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(ssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, params)
    );

stream.foreachRDD(rdd -> rdd.groupBy(record -> record.value().getMessageId()));

ssc.start();
ssc.awaitTermination();

在流式api中还有其他几种方法可以对消息进行分组。有关更多示例,请参阅文档。

相关问题