spark kafka-无法在120000毫秒内获取偏移量的记录

busg9geu  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(463)

我用spark来读Kafka的主题。
这是我的密码:

val df = spark.readStream.format("kafka").
        option("kafka.bootstrap.servers", "myendpoint.servicebus.windows.net:9093").
        option("kafka.security.protocol", "SASL_SSL").
        option("subscribe", "twitter").
        option("kafka.sasl.mechanism", "PLAIN").
        option("startingOffsets", "earliest").
        load()

val rawdata = df.selectExpr("CAST(value AS STRING)").as[String]

val query = rawdata.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

错误如下:

java.util.concurrent.TimeoutException: Cannot fetch record for offset 446451 in 120000 milliseconds
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:151)
    at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:142)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.foreach(WholeStageCodegenExec.scala:612)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2.scala:133)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2.scala:132)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2.scala:138)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$2.apply(WriteToDataSourceV2.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$2.apply(WriteToDataSourceV2.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

一些附加信息:我正在使用azurehdinsight spark环境,它使用kafka库连接到事件中心。
spark版本: 2.3.2.2.6.5.3026-7 hdinsight版本: 3.6.1000.67 eventhub分区计数: 8 Kafka图书馆版本:

org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2,
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2,
org.apache.kafka:kafka-clients:0.10.2.1

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题