Kafka只在指定时间后使用消息

cs7cruho  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(332)

我有一个Kafka的主题。
一个月后,一个消息被放到这个主题上,我必须采取行动。
为此,我在循环中执行以下操作:
投票Kafka
处理一个多月前发布的所有邮件
提交最新此类消息的偏移量+1
重复
这实际上是不起作用的,因为poll从停止的地方返回消息,并且忽略提交,除非发生重新平衡。
所以我必须缓冲未读的信息。但我还是要打电话给Perl,否则Kafka会假设你已经死了,然后重新平衡。每次投票将返回更多数据。这意味着我将在这个缓冲区中存储大量的数据,这并不理想。
理想的做法是告诉Kafka“我还活着”,而不是要求更多的信息。这样我就可以循环直到缓冲区为空,然后再请求更多的消息。
我的代码是这样的:

def run(): Unit = {
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer = new KafkaConsumer[Long, Array[Byte]](properties, new LongDeserializer().asInstanceOf[Deserializer[Long]], new ByteArrayDeserializer)
    consumer.subscribe(List(topic).asJava)
    while (true) {
      val pollResult = consumer.poll(Duration.ofSeconds(1))

      val commitMap = for (partition <- pollResult.partitions().asScala) yield {
        val records = pollResult.records(partition).asScala
        val record = records.find(record => {
          if (record.timestamp() + secondsInMonth * 1000 < Instant.now.toEpochMilli) {
            DoAction(record)
            false
          }
          else {
            true
          }
        })
        val offset = new OffsetAndMetadata(record.map(_.offset()).getOrElse(records.last.offset() + 1))
        (partition, offset)
      }

      consumer.commitSync(commitMap.toMap.asJava, Duration.ofSeconds(1))
    }
  }
5lhxktic

5lhxktic1#

你可以用 consumer.seek 以确保投票继续进行。因此,代码将被修改为:

def run(): Unit = {
    properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer = new KafkaConsumer[Long, Array[Byte]](properties, new LongDeserializer().asInstanceOf[Deserializer[Long]], new ByteArrayDeserializer)
    consumer.subscribe(List(topic).asJava)
    while (true) {
      val pollResult = consumer.poll(Duration.ofSeconds(1))

      val commitMap = for (partition <- pollResult.partitions().asScala) yield {
        val records = pollResult.records(partition).asScala
        val record = records.find(record => {
          if (record.timestamp() + secondsInMonth * 1000 < Instant.now.toEpochMilli) {
            DoAction(record)
            false
          }
          else {
            true
          }
        })
        val offset = new OffsetAndMetadata(record.map(_.offset()).getOrElse(records.last.offset() + 1))
        consumer.seek(partition, offset)
        (partition, offset)
      }

      consumer.commitSync(commitMap.toMap.asJava, Duration.ofSeconds(1))
    }
  }

相关问题