在并行scala中处理多个Dataframe

e7arh2l6  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(611)

我是scala spark的新手。我有一个像下面这样的Dataframe,我需要根据一个组id将数据分成不同的数据块,并独立地并行处理它们。

+----+-------+-----+-------+
|user|feature|value|groupID
+----+-------+-----+-------+
|   1|    100|    1|      A|
|   2|    20B|    0|      B|
|   3|    30A|    1|      B|
|   4|    40A|    1|      B| 
|   5|    50A|    1|      A|
|   6|    10A|    0|      B|
|   7|    200|    1|      A|
|   8|    30B|    1|      B|
|   9|    400|    0|      A|
|  10|    50C|    0|      A|
+----+-------+-----+-------+

第一步我需要分裂它有两个不同的df像这样的:我可以为这个用户一个过滤器。但我不确定(由于它们将产生大量不同的Dataframe),我是否应该将它们作为parquet保存到adl中,或者将它们保存在内存中。

+----+-------+-----+-------+
|user|feature|value|groupID
+----+-------+-----+-------+
|   1|    100|    1|      A|
|   5|    50A|    1|      A|
|   7|    200|    1|      A|
|   9|    400|    0|      A|
|  10|    50C|    0|      A|
+----+-------+-----+-------+

+----+-------+-----+-------+
|user|feature|value|groupID
+----+-------+-----+-------+
|   2|    20B|    0|      B|
|   3|    30A|    1|      B|
|   4|    40A|    1|      B| 
|   6|    10A|    0|      B|
|   8|    30B|    1|      B|
+----+-------+-----+-------+

两步并行独立处理每个Dataframe,得到独立处理的Dataframe。
给出一些背景:
groupid的数量将很高,因此它们不能硬编码。
理想情况下,每个Dataframe的处理将并行进行。
我想了解一下如何继续:我已经看到了.par.foreach(但我不清楚如何将其应用于动态数目的Dataframe,以及如何独立存储它们,也不清楚是否是最有效的方法)

gz5pxeao

gz5pxeao1#

检查以下代码。

scala> df.show(false)
+----+-------+-----+-------+
|user|feature|value|groupID|
+----+-------+-----+-------+
|1   |100    |1    |A      |
|2   |20B    |0    |B      |
|3   |30A    |1    |B      |
|4   |40A    |1    |B      |
|5   |50A    |1    |A      |
|6   |10A    |0    |B      |
|7   |200    |1    |A      |
|8   |30B    |1    |B      |
|9   |400    |0    |A      |
|10  |50C    |0    |A      |
+----+-------+-----+-------+

得到 distinct dataframe中的groupid值。

scala> val groupIds = df.select($"groupID").distinct.as[String].collect // Get distinct group ids.
groupIds: Array[String] = Array(B, A)

使用 .par 用于并行处理。你需要把你的逻辑加进去 map .

scala> groupIds.par.map(groupid => df.filter($"groupId" === lit(groupid))).foreach(_.show(false)) // here you might need add your logic to save or any other inside map function not foreach.., for example I have added logic to show dataframe content in foreach.
+----+-------+-----+-------+
|user|feature|value|groupID|
+----+-------+-----+-------+
|2   |20B    |0    |B      |
|3   |30A    |1    |B      |
|4   |40A    |1    |B      |
|6   |10A    |0    |B      |
|8   |30B    |1    |B      |
+----+-------+-----+-------+

+----+-------+-----+-------+
|user|feature|value|groupID|
+----+-------+-----+-------+
|1   |100    |1    |A      |
|5   |50A    |1    |A      |
|7   |200    |1    |A      |
|9   |400    |0    |A      |
|10  |50C    |0    |A      |
+----+-------+-----+-------+

相关问题