如何消除重复的消息,而Kafka流使用Spark流?

elcex8rz  于 2021-06-07  发布在  Kafka
关注(0)|答案(6)|浏览(575)

我有一个案例,Kafka制作人每天发送两次数据。这些生产者从数据库/文件中读取所有数据并发送给Kafka。所以这些信息每天都在发送,而且是重复的。我需要消除重复的消息,并写在一些持久性存储使用Spark流。在这种情况下,删除重复消息的最佳方法是什么?
发送的重复消息是一个json字符串,其时间戳字段仅更新。
note:i can“不要将kafka producer更改为只发送新的数据/消息,它已安装在客户端计算机中并由其他人编写。”。

fxnxkyjh

fxnxkyjh1#

一个更简单的方法是在Kafka结束时解决这个问题。看看Kafka的日志压缩功能。如果记录具有相同的唯一密钥,它将为您消除记录中的重复数据。
https://kafka.apache.org/documentation/#compaction

wdebmtf2

wdebmtf22#

您可以使用一个键值数据存储,其中您的键将是不包括timestamp字段和实际json值的字段的组合。
在轮询记录时,创建键和值对,并将其写入数据存储,该数据存储处理upsert(insert+update)或检查该键是否存在于数据存储中,然后删除消息

if(Datastore.get(key)){ 
     // then drop
 }else { 
    //write to the datastore
    Datastore.put(key)
}

我建议您检查hbase(处理upserts)和redis(用于查找的内存数据存储)

fhg3lkii

fhg3lkii3#

你可以试着用 mapWithState . 检查我的答案。

rhfm7lfc

rhfm7lfc4#

你调查过这个吗:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-重复数据消除
您可以尝试使用dropduplicates()方法。如果需要使用多个列来确定重复项,可以使用dropduplicates(string[]colnames)传递它们。

tcbh2hod

tcbh2hod5#

您可以将主题配置更改为 compact 模式。通过压缩,具有相同密钥的记录将在kafka日志中被覆盖/更新。在那里你只能从Kafka那里得到钥匙的最新值。
你可以在这里阅读更多关于压实的内容。

wpx232ag

wpx232ag6#

对于重复数据消除,您需要将有关已处理内容的信息(例如消息的唯一ID)存储在某个位置。
要存储邮件,您可以使用:
Spark检查站。优点:开箱即用。缺点:如果你更新应用程序的源代码,你需要清理检查点。因此,您将丢失信息。如果对重复数据消除的要求不严格,解决方案可以工作。
任何数据库。例如,如果您在hadoop env上运行,那么可以使用hbase。对于每一条您确实“得到”的消息(检查它以前没有发送过),并在db sent中标记它真正发送的时间。

相关问题