如何获得kafka主题分区的最后/结束偏移量?

kcugc4gi  于 2021-06-08  发布在  Kafka
关注(0)|答案(6)|浏览(516)

我在写一封信 kafka 使用java的消费者。我想保持消息的实时性,所以如果有太多消息等待消费,比如1000条或更多,我应该放弃未消费的消息,从最后一个偏移开始消费。
对于这个问题,我尝试比较上一个提交的偏移量和一个主题的结束偏移量(只有一个分区),如果这两个偏移量之间的差异大于一定的量,我会将主题的上一个提交的偏移量设置为下一个偏移量,这样我就可以放弃那些冗余的消息。
现在我的问题是如何得到一个主题的结尾偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?

xhv8bpkk

xhv8bpkk1#

我开发了下面的代码来获取偏移量状态

import java.util
import java.util.{Collections, Properties}

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._

class GetOffsetRange(consumer:KafkaConsumer[String,String]) {

  def getStartOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToBeginning(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }

  def getEndOffsetRange(topic:String):util.HashMap[TopicPartition,Long]={

    val topicPartitionList = consumer.partitionsFor(topic)
    val partitionMap=new util.HashMap[TopicPartition,Long]()
    val arrTopic=new util.ArrayList[TopicPartition]()

    consumer.subscribe(Collections.singletonList(topic));

    for(topic<-topicPartitionList.asScala){
      println(topic.topic() +","+topic.partition())
      arrTopic.add(new TopicPartition(topic.topic(),topic.partition()))
    }

    consumer.poll(0)

    consumer.seekToEnd(arrTopic)

    for(partition <- arrTopic.asScala){
      partitionMap.put(partition,consumer.position(partition)-1)
    }
    return partitionMap
  }
}
vd8tlhqk

vd8tlhqk2#

自kafka1.0.1以来,消费者有一个称为endoffsets的方法
public java.util.map endoffset(java.util.collection分区)
如果你需要完整的代码,请告诉我。。
请参考apache-kafka-1.0.1-javadoc

nbysray5

nbysray53#

KafkaConsumer<String, String> consumer = ...
consumer.subscribe(Collections.singletonList(topic));
TopicPartition topicPartition = new TopicPartition(topic, partition);
consumer.poll(0);
consumer.seekToEnd(Collections.singletonList(topicPartition));
long currentOffset = consumer.position(topicPartition) -1;

上面的代码段返回给定主题和分区号的当前提交的消息偏移量。

hc8w905p

hc8w905p4#

新的消费者也很复杂。 //assign the topic consumer.assign(); //seek to end of the topic consumer.seekToEnd(); //the position is the latest offset consumer.position();

s1ag04yj

s1ag04yj5#

也可以使用kafka服务器命令行工具:

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic-name

输出的格式为 <topicName>:<partitionID>:<offset> ,例如。 t1:0:0 ,请参见https://jaceklaskowski.gitbooks.io/apache-kafka/kafka-tools-getoffsetshell.html 更多细节。

yruzcnhs

yruzcnhs6#

对于Kafka版本:0.10.1.1

// Get the diff of current position and latest offset
Set<TopicPartition> partitions = new HashSet<TopicPartition>();
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition());
partitions.add(actualTopicPartition);
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition);
long actualPosition = consumer.position(actualTopicPartition);          
System.out.println(String.format("diff: %s   (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition));

相关问题