kafkastreams exactly\u once担保-跳过kafka补偿

cu6pst1q  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(461)

我正在使用Spark2.2.0和Kafka0.10SparkStreamingLibrary来阅读充满KafkaStreams scala应用程序的主题。kafka broker版本为0.11,kafka streams版本为0.11.0.2。
当我在Kafka流应用程序中设置一次担保时:

p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)

我在spark中得到了这个错误:

java.lang.AssertionError: assertion failed: Got wrong record for spark-executor-<group.id> <topic> 0 even after seeking to offset 24
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.to(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toBuffer(KafkaRDD.scala:189)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.toArray(KafkaRDD.scala:189)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

如果没有设置属性,它就可以正常工作。
编辑1:主题填充Kafka流应用程序(恰好一次启用)有错误的结束偏移量。当我运行kafka.tools.getoffsetshell时,它给出了结束偏移量18,但在主题中只有12条消息(保留被禁用)。当仅禁用一次保证时,这些偏移是匹配的。我试图重置Kafka流根据这个,但问题仍然存在。
编辑2:使用--print offset选项运行simpleconsumershell时,输出如下:

next offset = 1
{"timestamp": 149583551238149, "data": {...}}
next offset = 2
{"timestamp": 149583551238149, "data": {...}}
next offset = 4
{"timestamp": 149583551238149, "data": {...}}
next offset = 5
{"timestamp": 149583551238149, "data": {...}}
next offset = 7
{"timestamp": 149583551238149, "data": {...}}
next offset = 8
{"timestamp": 149583551238149, "data": {...}}
...

当启用一次交货保证时,某些偏移量显然被跳过。
有什么想法吗?什么会导致这种情况?谢谢!

fdbelqdn

fdbelqdn1#

我发现在kafka(版本>=0.11)中,偏移间隔是预期的行为,它们是由提交/中止事务标记引起的。
有关kafka事务和控制消息的更多信息,请单击此处:
这些事务标记不向应用程序公开,但在read\u committed模式下由使用者使用,以从中止的事务中过滤出消息,并且不返回属于打开的事务的消息(即,那些在日志中但没有关联事务标记的事务)。
还有这里。
kafka事务是在kafka 0.11中引入的,因此我假设spark streaming kafka library 0.10与此消息格式不兼容,并且spark streaming kafka的较新版本尚未实现。

相关问题