使用spark流媒体将数据发布到kafka主题时复制

n53p2ov0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(436)

我有一个spark流应用程序,它使用来自topic1的数据并对其进行解析,然后将相同的记录发布到两个进程中一个是topic2,另一个是hive表。在将数据发布到kafka topic2时,我在配置单元表中看到重复的数据,但没有看到重复的数据
使用spark 2.2,kafka 0.10.0

KafkaWriter.write(spark, storeSalesStreamingFinalDF, config)
writeToHIVE(spark, storeSalesStreamingFinalDF, config)

object KafkaWriter {

  def write(spark: SparkSession, df: DataFrame, config: Config)
  {
    df.select(to_json(struct("*")) as 'value)
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", config.getString("kafka.dev.bootstrap.servers"))
      .option("topic",config.getString("kafka.topic"))
      .option("kafka.compression.type",config.getString("kafka.compression.type"))
      .option("kafka.session.timeout.ms",config.getString("kafka.session.timeout.ms"))
      .option("kafka.request.timeout.ms",config.getString("kafka.request.timeout.ms"))
      .save()
  }
}

有人能帮忙吗,
在Kafka主题2中不需要重复。

lx0bsm1f

lx0bsm1f1#

为了处理重复数据,我们应该设置 .option("kafka.processing.guarantee","exactly_once" )

相关问题