scalaDataframe连接的优化代码

hkmswyz6  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(289)

新的Spark。我想基于一个公共列连接两个Dataframe
下面的代码片段我也在使用,但它占用了太多的时间。我如何才能优化相同的。请引导。我认为collect()产生了一个问题。

test_df.collect().foreach(row => {
      val split_data = row.mkString(",").split(",")
      val tag = java.lang.Long.parseLong(split_data(1))

      val otherList = other_df.select(col("Tags")).filter(other_df("tag") === tag).map(_.getString(0)).collect.toList
      if (otherList.size > 1) {
        val lat = java.lang.Double.parseDouble(split_data(2))
        val long = java.lang.Double.parseDouble(split_data(3))
        tagname = getFinalTag(lat, long, otherList)
      } else if (otherList.size == 1) {
        tagname = otherList.head
      }
mw3dktmi

mw3dktmi1#

您好这里有一些建议使用Spark:避免收集尽可能多。收集带来的所有数据到一个节点,在网络中创建了大量的流量,如果你要做一些进程,它可能无法完成,如果你没有足够的内存或cpu能力。。。如果您想连接两个Dataframe,其中一个Dataframe较小,只需广播大小最小的Dataframe,因此spark将在所有节点上复制它,这将减少无序排列的数量,并在网络上创建更少的通信量(连接不同的群集节点),我建议您使用reduce by key(而不是collect)来选择应用于数据的最佳函数。这是针对您的主要问题,否则在您的代码中,您没有使用预定义的spark join(除非我没有看到),如果您共享您的代码和您想要做的事情(您的逻辑),您可能会得到更精确的答案。我希望它能帮助你。

相关问题