apachespark—连接结构数组的方法

mspsb9vt  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(374)

我有一个包含结构数组的列。看起来是这样的:

|-- Network: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Code: string (nullable = true)
 |    |    |-- Signal: string (nullable = true)

这只是一个小示例,结构中的列比这个多得多。有没有一种方法可以将每一行的列中的数组连接起来,并使它们成为一个字符串?例如,我们可以有这样的东西:

[["example", 2], ["example2", 3]]

有没有一种方法可以:

"example2example3"?
jhkqcmku

jhkqcmku1#

假设有一个Dataframe df 使用以下模式:

df.printSchema

df 样本数据:

df.show(false)


您需要首先分解网络数组以选择结构元素代码和信号。

var myDf = df.select(explode($"Network").as("Network"))

然后需要使用concat()函数将两列连接起来,然后将输出传递给collect\u list()函数,该函数将所有行聚合为一行array

myDf = myDf.select(collect_list(concat($"Network.code",$"Network.signal")).as("data"))

最后,您需要将concat转换成所需的格式,这可以使用concat\u ws()函数来完成,该函数有两个参数,第一个参数是放置在两个字符串之间的分隔符,第二个参数是一个带有array类型的列,这是我们上一步的输出。根据您的用例,我们不需要在两个串联字符串之间放置任何分隔符,因此我们将分隔符参数保留为空引号。

myDf = myDf.select(concat_ws("",$"data").as("data"))

以上所有步骤都可以在一行中完成

myDf= myDf.select(explode($"Network").as("Network")).select(concat_ws("",collect_list(concat($"Network.code",$"Network.signal"))).as("data")).show(false)


如果要将输出直接输入字符串变量,请使用:

val myStr = myDf.first.get(0).toString
print(myStr)

zpqajqem

zpqajqem2#

有一个名为spark hats(github,小文章)的库,您可能会发现它在这些情况下非常有用。
使用它,您可以很容易地Map数组,并在元素旁边输出连接,如果您提供完全限定的名称,甚至可以在其他地方输出连接。

设置

import org.apache.spark.sql.functions._
import za.co.absa.spark.hats.Extensions._

scala> df.printSchema
root
 |-- info: struct (nullable = true)
 |    |-- drivers: struct (nullable = true)
 |    |    |-- carName: string (nullable = true)
 |    |    |-- carNumbers: string (nullable = true)
 |    |    |-- driver: string (nullable = true)
 |-- teamName: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- team1: string (nullable = true)
 |    |    |-- team2: string (nullable = true)

scala> df.show(false)
+---------------------------+------------------------------+
|info                       |teamName                      |
+---------------------------+------------------------------+
|[[RB7, 33, Max Verstappen]]|[[Redbull, rb], [Monster, mt]]|
+---------------------------+------------------------------+

你要找的命令

scala> val dfOut = df.nestedMapColumn(inputColumnName = "teamName", outputColumnName = "nextElementInArray", expression = a => concat(a.getField("team1"), a.getField("team2")) )
dfOut: org.apache.spark.sql.DataFrame = [info: struct<drivers: struct<carName: string, carNumbers: string ... 1 more field>>, teamName: array<struct<team1:string,team2:string,nextElementInArray:string>>]

输出

scala> dfOut.printSchema
root
 |-- info: struct (nullable = true)
 |    |-- drivers: struct (nullable = true)
 |    |    |-- carName: string (nullable = true)
 |    |    |-- carNumbers: string (nullable = true)
 |    |    |-- driver: string (nullable = true)
 |-- teamName: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- team1: string (nullable = true)
 |    |    |-- team2: string (nullable = true)
 |    |    |-- nextElementInArray: string (nullable = true)

scala> dfOut.show(false)
+---------------------------+----------------------------------------------------+
|info                       |teamName                                            |
+---------------------------+----------------------------------------------------+
|[[RB7, 33, Max Verstappen]]|[[Redbull, rb, Redbullrb], [Monster, mt, Monstermt]]|
+---------------------------+----------------------------------------------------+

相关问题