我们如何组合来自同一数据类型的dataframe的两列的值并获得每个元素的计数?

8ehkhllq  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(608)
val data = Seq(
  ("India","Pakistan","India"),
  ("Australia","India","India"),
  ("New Zealand","Zimbabwe","New Zealand"),
  ("West Indies", "Bangladesh","Bangladesh"),
  ("Sri Lanka","Bangladesh","Bangladesh"),
  ("Sri Lanka","Bangladesh","Bangladesh"),
  ("Sri Lanka","Bangladesh","Bangladesh")
)
val df = data.toDF("Team_1","Team_2","Winner")

我有这个数据框。我想数一数每队打了几场比赛?

up9lanfz

up9lanfz1#

可以使用union with select语句,也可以使用org.apache.spark.sql.functions.array中的数组

// METHOD 1  
df.select("Team_1").union(df.select("Team_2")).groupBy("Team_1").agg(count("Team_1")).show()

// METHOD 2
df.select(array($"Team_1", $"Team_2").as("Team")).select("Team").withColumn("Team",explode($"Team")).groupBy("Team").agg(count("Team")).show()

使用 select 声明和 union :

+-----------+-------------+
|     Team_1|count(Team_1)|
+-----------+-------------+
|  Sri Lanka|            3|
|      India|            2|
|West Indies|            1|
| Bangladesh|            4|
|   Zimbabwe|            1|
|New Zealand|            1|
|  Australia|            1|
|   Pakistan|            1|
+-----------+-------------+

Time Elapsed : 1588835600

使用 array :

+-----------+-----------+
    |       Team|count(Team)|
    +-----------+-----------+
    |  Sri Lanka|          3|
    |      India|          2|
    |West Indies|          1|
    | Bangladesh|          4|
    |   Zimbabwe|          1|
    |New Zealand|          1|
    |  Australia|          1|
    |   Pakistan|          1|
    +-----------+-----------+

    Time Elapsed : 342103600

性能方面的使用 org.apache.spark.sql.functions.array 这样更好。

knsnq2tg

knsnq2tg2#

以上答案讨论了三种方法,我试着评估(只是为了教育/意识)在绩效方面花费的时间。。。。

import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object Katu_37 extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  val spark = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate

  import spark.implicits._

  val data = Seq(
    ("India", "Pakistan", "India"),
    ("Australia", "India", "India"),
    ("New Zealand", "Zimbabwe", "New Zealand"),
    ("West Indies", "Bangladesh", "Bangladesh"),
    ("Sri Lanka", "Bangladesh", "Bangladesh"),
    ("Sri Lanka", "Bangladesh", "Bangladesh"),
    ("Sri Lanka", "Bangladesh", "Bangladesh")
  )
  val df = data.toDF("Team_1", "Team_2", "Winner")
df.show
  exec {
  println( "METHOD 1 ")
  df.select("Team_1").union(df.select("Team_2")).groupBy("Team_1").agg(count("Team_1")).show()
}
  exec {
    println( "METHOD 2 ")
    df.select(array($"Team_1", $"Team_2").as("Team")).select("Team").withColumn("Team", explode($"Team")).groupBy("Team").agg(count("Team")).show()
  }
  exec {
    println( "METHOD 3 ")
    val matchesCount = df.selectExpr("Team_1 as Teams").union(df.selectExpr("Team_2 as Teams"))
    matchesCount.groupBy("Teams").count().withColumnRenamed("count","MatchesPlayed").show()

  }

  /**
    *
    * @param f
    * @tparam T
    * @return
    */
  def exec[T](f: => T) = {

    val starttime = System.nanoTime()
    println("t = " + f)
    val endtime = System.nanoTime()
    val elapsedTime = (endtime - starttime )
//    import java.util.concurrent.TimeUnit
//    val convertToSeconds = TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS)
    println("time Elapsed " +  elapsedTime  )
  }
}

结果:

+-----------+----------+-----------+
|     Team_1|    Team_2|     Winner|
+-----------+----------+-----------+
|      India|  Pakistan|      India|
|  Australia|     India|      India|
|New Zealand|  Zimbabwe|New Zealand|
|West Indies|Bangladesh| Bangladesh|
|  Sri Lanka|Bangladesh| Bangladesh|
|  Sri Lanka|Bangladesh| Bangladesh|
|  Sri Lanka|Bangladesh| Bangladesh|
+-----------+----------+-----------+

METHOD 1 
+-----------+-------------+
|     Team_1|count(Team_1)|
+-----------+-------------+
|  Sri Lanka|            3|
|      India|            2|
|West Indies|            1|
| Bangladesh|            4|
|   Zimbabwe|            1|
|New Zealand|            1|
|  Australia|            1|
|   Pakistan|            1|
+-----------+-------------+

t = ()
time Elapsed 2729302088
METHOD 2 
+-----------+-----------+
|       Team|count(Team)|
+-----------+-----------+
|  Sri Lanka|          3|
|      India|          2|
|West Indies|          1|
| Bangladesh|          4|
|   Zimbabwe|          1|
|New Zealand|          1|
|  Australia|          1|
|   Pakistan|          1|
+-----------+-----------+

t = ()
time Elapsed 646513918
METHOD 3 
+-----------+-------------+
|      Teams|MatchesPlayed|
+-----------+-------------+
|  Sri Lanka|            3|
|      India|            2|
|West Indies|            1|
| Bangladesh|            4|
|   Zimbabwe|            1|
|New Zealand|            1|
|  Australia|            1|
|   Pakistan|            1|
+-----------+-------------+

t = ()
time Elapsed 988510662

我注意到了 org.apache.spark.sql.functions.array 该方法所用的时间(646513918纳秒)比 union 接近。。。

gkl3eglg

gkl3eglg3#

val matchesCount = df.selectExpr("Team_1 as Teams").union(df.selectExpr("Team_2 as Teams"))
matchesCount.groupBy("Teams").count().withColumnRenamed("count","MatchesPlayed").show()

+-----------+--------------+
|      Teams|MatchesPlayed|
+-----------+--------------+
|  Sri Lanka|             3|
|      India|             2|
|West Indies|             1|
| Bangladesh|             4|
|   Zimbabwe|             1|
|New Zealand|             1|
|  Australia|             1|
|   Pakistan|             1|
+-----------+--------------+

相关问题