kafkaconsumer的并发异常对于多线程访问不安全

jdg4fx2g  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(519)

我们从spark流媒体调用sparksql作业。我们得到了并发异常,kafka consumer是closed错误。以下是代码和异常详细信息:
Kafka消费代码

  1. // Start reading messages from Kafka and get DStream
  2. final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
  3. getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
  4. ConsumerStrategies.<String, byte[]>Subscribe(SparkServiceConfParams.AIR.CONSUME_TOPICS,
  5. sparkServiceConf.getKafkaConsumeParams()));
  6. ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
  7. // Decode each binary message and generate JSON array
  8. JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {}

..

  1. // publish generated json gzip to kafka
  2. decodedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
  3. private static final long serialVersionUID = 1L;
  4. @Override
  5. public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
  6. //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
  7. if(!jsonRdd4DF.isEmpty()) {
  8. //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
  9. Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
  10. SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
  11. AIRDataSetBean processAIRData = airMainJsonProcessor.processAIRData(json, sparkSession);

错误详细信息

  1. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  2. at java.lang.Thread.run(Thread.java:745)
  3. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

最后Kafka消费者关闭:

  1. org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
  2. at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  3. at org.apache.spark.scheduler.Task.run(Task.scala:86)
  4. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  5. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  6. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  7. at java.lang.Thread.run(Thread.java:745)
  8. Caused by: java.lang.IllegalStateException:
  9. This consumer has already been closed.
8wigbo56

8wigbo561#

使用spark streaming的cache或persist选项可以解决此问题。在这个场景中,使用缓存rdd不会再次从kafka中读取,问题得到解决。它支持流的并发使用。但请明智地使用缓存选项。下面是代码:

  1. JavaDStream<ConsumerRecord<String, byte[]>> cache = consumerStream.cache();

相关问题