spark scala列到列唯一值的计数器

bvjveswy  于 2021-05-16  发布在  Spark
关注(0)|答案(3)|浏览(502)

如何正确地获得列值作为Map(k->v),其中k是唯一值,v是出现计数?我是在groupby里面做的。

val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})

df
    .withWatermark("time", "30 seconds")
    .groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
    .agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
    .withColumn("qtypes", getMapUDF(col("foobar")))

编辑:输入

+-----------+-------------------+
| foo | bar | foobar            |
+-----------+-------------------+
| aaa | a   | [1,1,1,2,3,3]     |
| bbb | b   | [1,2,3,1,2]       |
+-----------+-------------------+

预期产量

+-----------+--------------------+
| foo | bar | foobarMap          |
+-----------+--------------------+
| aaa | a   | [1->3, 2->1, 3->2] |
| bbb | b   | [1->2, 2->2, 3->1] |
+-----------+--------------------+

问:我能用一下吗 map_from_arrays ?

dsekswqp

dsekswqp1#

考虑到数组arr,你认为这就是你要找的吗

val arr: Array[Long] = Array(1,1,1,2,3,3)

arr.groupBy(identity).mapValues(_.size)
vsnjm48y

vsnjm48y2#

我想可以做点什么来代替 collect_list 这样你就可以得到你想要的而不必做2 groupBy . 我假设你的输入数据 df 在下面。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df.show
+---+---+---+
|foo|bar|aaa|
+---+---+---+
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  2|
|aaa|  a|  3|
|aaa|  a|  3|
|bbb|  b|  1|
|bbb|  b|  2|
|bbb|  b|  3|
|bbb|  b|  1|
|bbb|  b|  2|
+---+---+---+

val df2 = df.withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
    )
).groupBy(
    "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")

df2.show(2,0)
+---+---+--------+------------------------+
|foo|bar|rowcount|foobarmap               |
+---+---+--------+------------------------+
|aaa|a  |6       |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b  |5       |[2 -> 2, 3 -> 1, 1 -> 2]|
+---+---+--------+------------------------+

要添加水印和按窗口分组,可以按以下方式实现代码:

val df2 = df.withWatermark(
    "time", "30 seconds"
).withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
    ).alias("foobarmap")
).groupBy(
    window(col("time"), "1 minutes"), "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
iyr7buue

iyr7buue3#

因此,如果您只想用sparksqlapi/列转换来替换udf,这可能就是您想要的

val data = Seq(
      ("aaa","a",Array(1,1,1,2,3,3)),
      ("bbb","b",Array(1,2,3,1,2))
    )

    val df = spark.createDataset(data).toDF("foo", "bar", "foobar")

    val res = df.select($"foo",explode_outer($"foobar"))
      .groupBy("foo","col").count()
      .withColumn("mapped",map($"col",$"count"))
      .groupBy("foo")
      .agg(collect_list("mapped"))

    res.show(false)

所以你会收到这个

+---+------------------------------+
|foo|collect_list(mapped)          |
+---+------------------------------+
|aaa|[[3 -> 2], [1 -> 3], [2 -> 1]]|
|bbb|[[2 -> 2], [1 -> 2], [3 -> 1]]|
+---+------------------------------+

希望这对你有所帮助

相关问题