如何用另一个Dataframe的随机值更新pyspark中的Dataframe?

pinkon5k  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(315)

我在pyspark中有两个Dataframe,如下所示:
Dataframea:共1000条记录

+-----+
|Name |
+-----+
|    a|
|    b|
|    c|
+-----+

Dataframeb:共3条记录

+-----+
|Zip  |
+-----+
|06905|
|06901|
|06902|
+-----+

我需要在dataframea中添加一个名为zip的新列,并用从dataframeb中随机选择的值填充这些值。所以Dataframea看起来像这样:

+-----+-----+
|Name |Zip  |
+-----+-----+
|    a|06901|
|    b|06905|
|    c|06902|
|    d|06902|
+-----+-----+

请帮我写这个代码。非常感谢你的帮助。
我在azure数据库上运行这个,显然,quinn不是其中的一个模块。所以很不幸不能用奎因。

vhmi4jdf

vhmi4jdf1#

如果 b 很小(3行),您只需将其收集到python列表中,并将其作为数组列添加到 a . 然后你可以使用 shuffle .

import pyspark.sql.functions as F

df = a.withColumn(
    'Zip',
    F.shuffle(
        F.array(*[F.lit(r[0]) for r in b.collect()])
    )[0]
)

df.show()
+----+-----+
|Name|  Zip|
+----+-----+
|   a|06901|
|   b|06905|
|   c|06902|
|   d|06901|
+----+-----+
hfsqlsce

hfsqlsce2#

您可以使用zips对Dataframe进行聚合,并将值收集到一个数组列中,然后执行交叉连接并从zips数组中选择一个随机元素,例如 shuffle 在拾取第一个元素之前:

from pyspark.sql import functions as F

df_result = df_a.crossJoin(
    df_b.agg(F.collect_list("Zip").alias("Zip"))
).withColumn(
    "Zip",
    F.expr("shuffle(Zip)[0]")
)

# +----+-----+

# |Name|  Zip|

# +----+-----+

# |   a|06901|

# |   b|06902|

# |   c|06901|

# |   d|06901|

# +----+-----+

相关问题