我在尝试使用sparkdstreams读取kafka时遇到以下错误
这是卡法克的密码
public void start() throws InterruptedException {
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferBrokers(),
ConsumerStrategies.<String, String>Subscribe(topicsList, kafkaParams));
messages.foreachRDD(rdd -> {
JavaPairRDD<String, String> pairRdd = rdd.mapToPair(record-> {
logger.debug("Reading Message : " +record.value() + " Offset = "+record.offset() + " Partition = "+record.partition());
return new Tuple2<>(record.key(), record.value());
});
sparkEtlManager.etl(pairRdd);
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
这是我看到的例外。最奇怪的是,当使用Kafka制作人在本地进行测试时,它运行良好。
java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-FtsTopicConsumerGrpTESTING_5 fts.analytics-0 after polling for 600000
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:178)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.compactedNext(KafkaDataConsumer.scala:56)
at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:212)
at org.apache.spark.streaming.kafka010.CompactedKafkaRDDIterator.next(KafkaRDD.scala:308)
at org.apache.spark.streaming.kafka010.CompactedKafkaRDDIterator.next(KafkaRDD.scala:272)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1819)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1213)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1213)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
暂无答案!
目前还没有任何答案,快来回答吧!