我已经看到了一个类似于clickhere的问题
但我还是想知道是否从特定分区流式传输数据是不可能的?我在spark流订阅方法中使用了kafka消费策略。
consumerstrategies.subscribe[string,string](主题,kafkaparams,偏移量)
这是我用来订阅主题和分区的代码片段,
val topics = Array("cdc-classic")
val topic="cdc-classic"
val partition=2;
val offsets=
Map(new TopicPartition(topic, partition) -> 2L)//I am not clear with this line, (I tried to set topic and partition number as 2)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams,offsets))
但是当我运行这个代码时,我得到以下异常,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {cdc-classic-2=2}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:878)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1110)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
p、 s:cdc-classic是具有17个分区的主题名
2条答案
按热度按时间yeotifhr1#
指定此行中要流式处理数据的分区的分区号和起始偏移量,
哪里,
partition是分区号
2l为分区起始偏移量。
然后我们可以从选定的分区流式传输数据。
qc6wkl3g2#
Kafka的分区是spark的并行单元。因此,即使从技术上讲这是可能的,但这是没有意义的,因为所有数据都将由单个执行器处理。你不必使用spark,只需启动你的流程
KafkaConsumer
:(https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html)
如果您想从spark自动重试中获益,您可以简单地用它创建一个docker映像,然后使用kubernetes和适当的重试配置启动它。
关于spark,如果您真的想使用它,您应该检查您读取的分区的偏移量是多少。可能您提供了一个不正确的,它会返回“超出范围”的偏移量消息(可能以0开头?)。