我正在使用下面的spark streaming Scala代码来消耗来自producer topic的真实的Kafka消息。但问题是有时候我的工作会因为服务器连接或其他原因而失败,并且在我的代码中自动提交属性设置为true,因为一些消息丢失,无法存储在我的数据库中。
所以我只想知道如果我们想从特定的偏移量中提取旧Kafka消息,有没有办法。我尝试将“auto.offset.reset”设置为最早或最新,但它只获取尚未提交的新消息。
我们以当前偏移量编号为1060且自动偏移量重置属性最早为例,因此当我重新启动作业时,它会从1061开始阅读消息,但在某些情况下,如果我想从偏移量编号1020读取旧Kafka消息,那么是否有任何属性可用于从特定偏移量编号开始使用消息
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val topic = "test123"
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"schema.registry.url" -> "http://abc.test.com:8089"
"group.id" -> "spark-streaming-notes",
"auto.offset.reset" -> "earliest"
"enable.auto.commit" -> true
)
val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topic, KafkaParams)
stream.print()
ssc.start()
ssc.awaitTermination()
1条答案
按热度按时间ogsagwnx1#
在Spark Streaming中,你不能这样做,你需要使用
kafka-consumer-groups
CLI来提交特定于你的组ID的偏移量,或者手动构造一个KafkaConsumer示例,并在启动Spark上下文之前调用commitSync。结构化流确实提供
startingOffsets
配置。auto.offset.reset只适用于不存在的group.id