spark流中的转换需要更多的时间,即使有0条消息

yqhsw0fo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(253)

我有严重的性能问题与Spark流。对于10秒的批处理间隔,程序大约需要2分钟。我尝试在没有来自kafka主题的0条消息的情况下进行调试。大多数转换都需要30秒以上的时间,即使没有要使用/处理的消息。尽管decodemessagesdf中没有消息,但下面的命令大约需要40秒。

val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(customer), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

另外,下面的发布代码也需要大约30秒才能发布0条消息

message.foreachPartition{ part =>
  val producer = new KafkaProducer[String, String](props)
  part.foreach{ msg =>
    val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
    producer.send(message)
  }
  producer.close()

}
如果密码有什么问题,请告诉我。谢谢

6bc51xsx

6bc51xsx1#

如果“customer”有大量数据,则广播是昂贵的。也许您应该将broadcast(customer)从join操作中删除,如下所示:

val consumerBroadcast = sc.broadcast(customer)
    val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(consumerBroadcast), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))

此代码将只广播客户一次。但您的代码将广播每批客户。

相关问题