kafka spark数据流无法获取记录

64jmpszr  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(634)

我在尝试使用sparkdstreams读取kafka时遇到以下错误
这是卡法克的密码

public void start() throws InterruptedException {
        JavaInputDStream<ConsumerRecord<String, String>> messages =  KafkaUtils.createDirectStream(
                jssc, 
                LocationStrategies.PreferBrokers(), 
                ConsumerStrategies.<String, String>Subscribe(topicsList, kafkaParams));  
        messages.foreachRDD(rdd -> {

            JavaPairRDD<String, String> pairRdd = rdd.mapToPair(record-> {
                logger.debug("Reading Message : " +record.value() + " Offset = "+record.offset() + " Partition = "+record.partition());
                return new Tuple2<>(record.key(), record.value());    
            });
            sparkEtlManager.etl(pairRdd);
            ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);           
        });

这是我看到的例外。最奇怪的是,当使用Kafka制作人在本地进行测试时,它运行良好。

java.lang.IllegalArgumentException: requirement failed: Failed to get records for compacted spark-executor-FtsTopicConsumerGrpTESTING_5 fts.analytics-0 after polling for 600000
            at scala.Predef$.require(Predef.scala:224)
            at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:178)
            at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.compactedNext(KafkaDataConsumer.scala:56)
            at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:212)
            at org.apache.spark.streaming.kafka010.CompactedKafkaRDDIterator.next(KafkaRDD.scala:308)
            at org.apache.spark.streaming.kafka010.CompactedKafkaRDDIterator.next(KafkaRDD.scala:272)
            at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1819)
            at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1213)
            at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1213)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
            at org.apache.spark.scheduler.Task.run(Task.scala:123)
            at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题