kafka消费者在尝试使用spark处理消息时多次使用消息

2cmtqfgy  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(634)

我有一个kafka消费者,它从一个主题中读取消息,并使用spark将其写入一个配置单元表。当我在Yarn上运行代码时,它会多次读取相同的消息。我有大约100000条关于这个主题的信息。但是,我的消费者一直在多次阅读相同的内容。当我做一个不同的计算时,我得到了实际的计数。
这是我写的代码。我想知道我是否错过了任何设置。

val spark = SparkSession.builder()
      .appName("Kafka Consumer")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val kafkaConsumerProperty = new Properties()
    kafkaConsumerProperty.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "---")
    kafkaConsumerProperty.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperty.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperty.put(ConsumerConfig.GROUP_ID_CONFIG, "draw_attributes")
    kafkaConsumerProperty.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    kafkaConsumerProperty.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    val topic = "space_orchestrator"
    val kafkaConsumer = new KafkaConsumer[String,String](kafkaConsumerProperty)
    kafkaConsumer.subscribe(Collections.singletonList(topic))

    while(true){

      val recordSeq = kafkaConsumer.poll(10000).toSeq.map( x => x.value())
      if(!recordSeq.isEmpty)
        {
          val newDf = spark.read.json(recordSeq.toDS)
          newDf.write.mode(SaveMode.Overwrite).saveAsTable("dmart_dev.draw_attributes")
        }
    }
ddrv8njm

ddrv8njm1#

或者,尝试手动设置偏移。为此,应禁用自动提交( enable.auto.commit = false ). 对于手动提交,kafkaconsumers提供了两种方法,即 commitSync() 以及 commitAsync() . 顾名思义,commitsync()是一个阻塞调用,它在成功提交偏移量之后返回,而commitsync()则立即返回。

相关问题