如何通过Spark streaming Scala在使用Kafka主题消息时设置特定的偏移量

mgdq6dx1  于 2022-12-18  发布在  Scala
关注(0)|答案(1)|浏览(147)

我正在使用下面的spark streaming Scala代码来消耗来自producer topic的真实的Kafka消息。但问题是有时候我的工作会因为服务器连接或其他原因而失败,并且在我的代码中自动提交属性设置为true,因为一些消息丢失,无法存储在我的数据库中。
所以我只想知道如果我们想从特定的偏移量中提取旧Kafka消息,有没有办法。我尝试将“auto.offset.reset”设置为最早或最新,但它只获取尚未提交的新消息。
我们以当前偏移量编号为1060且自动偏移量重置属性最早为例,因此当我重新启动作业时,它会从1061开始阅读消息,但在某些情况下,如果我想从偏移量编号1020读取旧Kafka消息,那么是否有任何属性可用于从特定偏移量编号开始使用消息

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext._ 

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

val topic = "test123"
val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[KafkaAvroDeserializer],
  "schema.registry.url" -> "http://abc.test.com:8089"
  "group.id" -> "spark-streaming-notes",
  "auto.offset.reset" -> "earliest"
  "enable.auto.commit" -> true
)

val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topic, KafkaParams)

stream.print()

ssc.start()
ssc.awaitTermination()
ogsagwnx

ogsagwnx1#

在Spark Streaming中,你不能这样做,你需要使用kafka-consumer-groups CLI来提交特定于你的组ID的偏移量,或者手动构造一个KafkaConsumer示例,并在启动Spark上下文之前调用commitSync。
结构化流确实提供startingOffsets配置。
auto.offset.reset只适用于不存在的group.id

相关问题