:)
我在一个(奇怪的)情况中结束了自己,在这个情况下,简单地说,我不想消费kafka的任何新记录,所以暂停主题中所有分区的sparkstreaming消费(inputdstream[consumerrecord]),执行一些操作,最后,继续消费记录。
首先。。。这可能吗?
我一直在尝试这样的事情:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但我知道了:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
任何帮助我理解我遗漏了什么,为什么我得到空的结果时,很明显消费者已分配分区将受到欢迎!
版本:kafka:0.10 spark:2.3.0 scala:2.11.8
1条答案
按热度按时间watbbzwu1#
是的,可以在代码中添加检查点并传递持久存储(本地磁盘、s3、hdfs)路径
无论何时开始/恢复作业,它都会从检查点提取带有消费者偏移的kafka消费者组信息,并从停止位置开始处理。
spark check-=pointing不仅是一种保存偏移量的机制,而且还可以保存阶段和作业的dag的序列化状态。因此,每当您用新代码重新启动作业时
读取并处理序列化数据
如果spark应用程序中有任何代码更改,请清理缓存的dag阶段
使用最新代码从新数据恢复处理。
现在,从磁盘读取只是spark加载kafka偏移量、dag和旧的未完成处理数据所需的一次性操作。
一旦完成,它将始终按默认或指定的检查点间隔将数据保存到磁盘。
spark streaming提供了指定kafka组id的选项,但spark structured stream没有。