spark collect\ list将数据类型从数组更改为字符串

ev7lccsx  于 2021-05-24  发布在  Spark
关注(0)|答案(2)|浏览(736)

我有一个下面的问题

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))

在这里,我做了一些汇总和收集的结果 collect_list . 早些时候我们使用spark 1,它给了我以下数据类型。

|-- final_data1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- final_data2: array (nullable = true)
 |    |-- element: string (containsNull = true)

现在我们必须迁移到spark2,但是我们正在得到下面的模式。

|-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- final_data1: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

在得到 first() 下面的记录就是区别

spark 1.6

[2020-09-26, Ayush, 103.67] => datatype string

spark 2 

WrappedArray(2020-09-26, Ayush, 103.67)

如何保持相同的数据类型?
编辑-尝试使用concat
我得到类似spark 1.6的精确模式的一种方法是使用如下concat

val df_date_agg = df
    .groupBy($"msisdn",$"event_date",$"network")
    .agg(sum($"data_mou").alias("data_mou_dly"),sum($"voice_mou").alias("voice_mou_dly"))
    .groupBy($"msisdn")
    .agg(collect_list(concat(lit("["),lit($"event_date"),lit(","),lit($"network"),lit(","),lit($"data_mou_dly"),lit("]")))

它会影响我的代码性能吗??有没有更好的办法?

w80xi6nr

w80xi6nr1#

既然您需要数组的字符串表示,那么将数组转换成这样的字符串如何?

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1") cast "string").alias("final_data1"),
         collect_list(array($"b",$"c",$"data2") cast "string").alias("final_data2"))

它可能只是你的旧版本的Spark在做什么。
顺便说一句,您建议的解决方案可能也会起作用,但是可以用 lit 没有必要( lit($"event_date") ). $"event_date" 够了。

vhipe2zx

vhipe2zx2#

flltening final1和final2列可以解决这个问题。

val data = Seq((1,"A", "B"), (1, "C", "D"), (2,"E", "F"), (2,"G", "H"), (2,"I", "J"))

val df = spark.createDataFrame(
  data
).toDF("col1", "col2", "col3")

val old_df = df.groupBy(col("col1")).agg(
    collect_list(
        array(
            col("col2"), 
            col("col3")
            )
    ).as("final")
    )
val new_df = old_df.select(col("col1"), flatten(col("final")).as("final_new"))
println("Input Dataframe")

df.show(false)
println("Old schema format")
old_df.show(false)
old_df.printSchema()

println("New schema format")
new_df.show(false)
new_df.printSchema()

输出:

Input Dataframe
+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |A   |B   |
|1   |C   |D   |
|2   |E   |F   |
|2   |G   |H   |
|2   |I   |J   |
+----+----+----+

Old schema format
+----+------------------------+
|col1|final                   |
+----+------------------------+
|1   |[[A, B], [C, D]]        |
|2   |[[E, F], [G, H], [I, J]]|
+----+------------------------+

root
 |-- col1: integer (nullable = false)
 |-- final: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

New schema format
+----+------------------+
|col1|final_new         |
+----+------------------+
|1   |[A, B, C, D]      |
|2   |[E, F, G, H, I, J]|
+----+------------------+

root
 |-- col1: integer (nullable = false)
 |-- final_new: array (nullable = true)
 |    |-- element: string (containsNull = true)

在你的特殊情况下

val df_date_agg = df
    .groupBy($"a",$"b",$"c")
    .agg(sum($"d").alias("data1"),sum($"e").alias("data2"))
    .groupBy($"a")
    .agg(collect_list(array($"b",$"c",$"data1")).alias("final_data1"),
         collect_list(array($"b",$"c",$"data2")).alias("final_data2"))
         .select(flatten(col("final_data1").as("final_data1"), flatten(col("final_data2).as("final_data2))

相关问题