java.util.concurrentmodificationexception:kafkaconsumer对多线程访问不安全

xt0899hw  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(342)

我有一个
Scala Spark Streaming 从同一主题接收来自3个不同主题的数据的应用程序 Kafka producers .
spark流应用程序在主机上 0.0.0.179 ,kafka服务器位于主机为的计算机上 0.0.0.178 ,的 Kafka producers 在机器上, 0.0.0.180 , 0.0.0.181 , 0.0.0.182 .
当我试着运行 Spark Streaming 应用程序出现以下错误
线程“main”org.apache.spark.sparkexception中出现异常:由于阶段失败而中止作业:阶段19.0中的任务0失败1次,最近的失败:阶段19.0中的任务0.0丢失(tid 19,localhost):java.util.concurrentmodificationexception:kafkaconsumer对于org.apache.kafka.clients.consumer.kafkaconsumer.acquire(kafkaconsumer)上的多线程访问不安全。java:1625)在org.apache.kafka.clients.consumer.kafkaconsumer.seek(kafkaconsumer。java:1198)在org.apache.spark.streaming.kafka010.cachedkafconsumer.seek(cachedkafconsumer。scala:95)访问org.apache.spark.streaming.kafka010.cachedkafconsumer.get(cachedkafconsumer)。scala:69)在org.apache.spark.streaming.kafka010.kafkardd$kafkarditerator.next(kafkardd。scala:228)在org.apache.spark.streaming.kafka010.kafkardd$kafkarditerator.next(kafkardd。scala:194)在scala.collection.iterator$$anon$11.next(iterator。scala:409)在scala.collection.iterator$$anon$11.next(iterator。scala:409)在org.apache.spark.rdd.pairddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$13$$anonfun$apply$7.apply$mcv$sp(pairddfunctions)。scala:1204)位于org.apache.spark.rdd.pairddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$13$$anonfun$apply$7.apply(pairddfunctions)。scala:1203)在org.apache.spark.rdd.pairddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$13$$anonfun$apply$7.apply(pairddfunctions)。scala:1203)在org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils。scala:1325)位于org.apache.spark.rdd.pairddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$13.apply(pairddfunctions)。scala:1211)在org.apache.spark.rdd.pairddfunctions$$anonfun$saveashadoopdataset$1$$anonfun$13.apply(pairddfunctions)。scala:1190)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:70)在org.apache.spark.scheduler.task.run(task。scala:85)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:748)
现在我读了上千篇不同的帖子,但似乎没有人能找到解决这个问题的办法。
我该如何处理我的申请?我必须修改卡卡的一些参数吗(目前 num.partition 参数设置为1)?
以下是我的申请代码:

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)    

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  } 
  )      
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()

谢谢您

vc9ivgsu

vc9ivgsu1#

你的问题是:

s1.print()
s1.saveAsTextFiles("results/", "")

因为spark创建了一个流图,您在这里定义了两个流:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file

spark将尝试同时运行这两个图,因为它们彼此独立。由于kafka使用缓存消费者方法,因此它实际上是在尝试对两个流执行使用相同的消费者。
你能做的就是缓存 DStream 在运行这两个查询之前:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")
kpbpu008

kpbpu0082#

使用缓存对我有用。在我的例子中,print,transformation,然后在javapairdstream上打印给了我这个错误。在第一次打印之前我用了cache,它对我有用。

s1.print()
s1.saveAsTextFiles("results/", "")

下面的代码将工作,我用了类似的代码。

s1.cache();
s1.print();
s1.saveAsTextFiles("results/", "");

相关问题