我有严重的性能问题与Spark流。对于10秒的批处理间隔,程序大约需要2分钟。我尝试在没有来自kafka主题的0条消息的情况下进行调试。大多数转换都需要30秒以上的时间,即使没有要使用/处理的消息。尽管decodemessagesdf中没有消息,但下面的命令大约需要40秒。
val enrichedDF: DataFrame = decodeMessagesDF.join(broadcast(customer), (decodeMessagesDF( "rowkey") === customer("rowkey")) && (customer("mkt_opto_flag") === "N") && (customer("ads_opto_flag") === "N"))
另外,下面的发布代码也需要大约30秒才能发布0条消息
message.foreachPartition{ part =>
val producer = new KafkaProducer[String, String](props)
part.foreach{ msg =>
val message = new ProducerRecord[String, String](topic, msg._1, msg._2)
producer.send(message)
}
producer.close()
}
如果密码有什么问题,请告诉我。谢谢
1条答案
按热度按时间6bc51xsx1#
如果“customer”有大量数据,则广播是昂贵的。也许您应该将broadcast(customer)从join操作中删除,如下所示:
此代码将只广播客户一次。但您的代码将广播每批客户。