如果kafka consumer失败(spark job),如何获取kafka consumer提交的最后一个偏移量(斯卡拉)

wa7juj8i  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(416)

在我给出任何细节之前,请注意,我不是在问如何使用kafka-run-class.sh kafka.tools.consumerofsectcher从控制台获取最新偏移量。
我正在尝试使用scala(2.11.8)在spark(2.3.1)中创建一个kafka消费者(kafka版本0.10),它将是容错的。所谓容错,我的意思是,如果由于某种原因,kafka使用者死亡并重新启动,它应该继续使用最后一个偏移量的消息。
为了实现这一点,我使用下面的代码在使用kafka偏移量之后提交它,

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_101",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean), /*because messages successfully polled by the consumer may not yet have resulted in a Spark output operation*/
"session.timeout.ms" -> (30000: java.lang.Integer),
"heartbeat.interval.ms" -> (3000: java.lang.Integer)
)

val topic = Array("topic_1")

val offsets = Map(new org.apache.kafka.common.TopicPartition("kafka_cdc_1", 0) -> 2L) /*Edit: Added code to fetch offset*/

val kstream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topic, kafkaParams, offsets)  /*Edit: Added offset*/ 
)

kstream.foreachRDD{ rdd =>
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if(!rdd.isEmpty()) {
  val rawRdd = rdd.map(record => 
 (record.key(),record.value())).map(_._2).toDS()
  val df = spark.read.schema(tabSchema).json(rawRdd)
  df.createOrReplaceTempView("temp_tab")
  df.write.insertInto("hive_table")
}
kstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange) /*Doing Async Commit Here */
}

我试过很多方法来获取给定主题的最新偏移量,但都没有成功。
有人能帮我用scala代码来实现这个吗?
edit:在上面的代码中,我尝试使用

val offsets = Map(new org.apache.kafka.common.TopicPartition("kafka_cdc_1", 0) -> 2L) /*Edit: Added code to fetch offset*/

但是上面代码获取的偏移量是0,不是最新的。有没有最新的偏移量?

wfveoks0

wfveoks01#

找到了解决上述问题的方法。给你。希望它能帮助有需要的人。
语言:scala,spark job

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_101",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean), /*because messages successfully polled by the consumer may not yet have resulted in a Spark output operation*/
"session.timeout.ms" -> (30000: java.lang.Integer),
"heartbeat.interval.ms" -> (3000: java.lang.Integer)
)

import java.util.Properties

//create a new properties object with Kafaka Parameters as done previously. Note: Both needs to be present. We will use the proprty object just to fetch the last offset

val kafka_props = new Properties()
kafka_props.put("bootstrap.servers", "localhost:9092")
kafka_props.put("key.deserializer",classOf[StringDeserializer])
kafka_props.put("value.deserializer",classOf[StringDeserializer])
kafka_props.put("group.id","group_101")
kafka_props.put("auto.offset.reset","latest")
kafka_props.put("enable.auto.commit",(false: java.lang.Boolean))
kafka_props.put("session.timeout.ms",(30000: java.lang.Integer))
kafka_props.put("heartbeat.interval.ms",(3000: java.lang.Integer))

val topic = Array("topic_1")

/*val offsets = Map(new org.apache.kafka.common.TopicPartition("topic_1", 0) -> 2L) Edit: Added code to fetch offset*/

val topicAndPartition = new org.apache.kafka.common.TopicPartition("topic_1", 0) //Using 0 as the partition because this topic does not have any partitions
val consumer = new KafkaConsumer[String,String](kafka_props)    //create a 2nd consumer to fetch last offset
import java.util
consumer.subscribe(util.Arrays.asList("topic_1"))   //Subscribe to the 2nd consumer. Without this step, the offsetAndMetadata can't be fetched.
val offsetAndMetadata = consumer.committed(topicAndPartition)    //Find last committed offset for the given topicAndPartition
val endOffset = offsetAndMetadata.offset().toLong   //fetch the last committed offset from offsetAndMetadata and cast it to Long data type.

val fetch_from_offset = Map(new org.apache.kafka.common.TopicPartition("topic_1", 0) -> endOffset) // create a Map with data type (TopicPartition, Long)

val kstream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topic, kafkaParams, fetch_from_offset) //Pass the offset Map of datatype (TopicPartition, Long) created eariler
)

kstream.foreachRDD{ rdd =>
val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if(!rdd.isEmpty()) {
  val rawRdd = rdd.map(record => 
 (record.key(),record.value())).map(_._2).toDS()
  val df = spark.read.schema(tabSchema).json(rawRdd)
  df.createOrReplaceTempView("temp_tab")
  df.write.insertInto("hive_table")
}
kstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRange) /*Doing Async offset Commit Here */
}

相关问题