python 根据条件将一个 Dataframe 列与另一个 Dataframe 列相乘

xjreopfe  于 2022-12-28  发布在  Python
关注(0)|答案(3)|浏览(252)

有两个数据框,一个是信息表,另一个是引用表。我需要根据条件乘以两列,下面是详细信息:
Dataframe (信息)

+-----+-----+
|  key|value|
+-----+-----+
|    a|   10|
|    b|   20|
|    c|   50|
|    d|   40|
+-----+-----+

Dataframe (参考)

+-----+----------+
|  key|percentage|
+-----+----------+
|    a|       0.1|
|    b|       0.5|
+-----+----------+

Dataframe(这是我想要的输出)

+-----+------+
|  key|result|
+-----+------+
|    a|     1|   (10 * 0.1 = 1)
|    b|    10|   (20 * 0.5 = 10)
|    c|    50|   (because there are no key matching in reference table, then remain the same)
|    d|    40|   (because there are no key matching in reference table, then remain the same)
+-----+------+

我已经尝试了下面的代码,但失败了.

df_cal = (
    info
    .withColumn('result', f.when(f.col('key')==reference.withColumn(f.col('key')), \
                          f.col('value)*reference.withColumn(f.col('percentage')) ))
    .select('key', 'result')
)

df_cal.show()
voase2hg

voase2hg1#

连接和相乘。代码和逻辑如下

new_info = (info.join(broadcast(Reference), on='key', how='left')#Join the two dataframes
 .na.fill(1.0)#Fill null with 1
 .withColumn('result', col('value')*col('percentage'))#multiply the columns and store in results
 .drop('value','percentage')#drop unwanted columns
)

new_info.show()
dba5bblo

dba5bblo2#

在总体逻辑保持相同的情况下,与WWNDE的解决方案的细微差别在于使用coalesce而不是fillna。如果在没有子集的情况下使用fillna,则fillna也可以填充不想要的列,并且在任何情况下,它在Spark规划中生成新的投影。
使用coalesce的示例

data1_sdf. \
    join(data2_sdf, ['key'], 'left'). \
    withColumn('result', 
               func.coalesce(func.col('value') * func.col('percentage'), func.col('value'))
               ). \
    show()

# +---+-----+----------+------+
# |key|value|percentage|result|
# +---+-----+----------+------+
# |  d|   40|      null|  40.0|
# |  c|   50|      null|  50.0|
# |  b|   20|       0.5|  10.0|
# |  a|   10|       0.1|   1.0|
# +---+-----+----------+------+
3qpi33ja

3qpi33ja3#

如果你愿意使用Spark SQL而不是DataFrame API,你可以这样做:
创建 Dataframe 。(可选,因为您已经有数据)

from pyspark.sql.types import StructType,StructField, IntegerType, FloatType, StringType

# create info dataframe
info_data = [
  ("a",10),
  ("b",20),
  ("c",50),
  ("d",40),
]
info_schema = StructType([
  StructField("key",StringType()),
  StructField("value",IntegerType()),
])
info_df = spark.createDataFrame(data=info_data,schema=info_schema)

# create reference dataframe
reference_data = [
  ("a",.1),
  ("b",.5)
]
reference_schema = StructType([
  StructField("key",StringType()),
  StructField("percentage",FloatType()),
])
reference_df = spark.createDataFrame(data=reference_data,schema=reference_schema)
reference_df.show()

接下来,我们需要创建2个 Dataframe 的视图以运行SQL查询。下面,我们将从info_df创建一个名为info的视图,并从reference_df创建一个名为reference的视图

# create views: info and reference
info_df.createOrReplaceTempView("info")
reference_df.createOrReplaceTempView("reference")

最后,我们编写一个查询来执行乘法。该查询在info和reference之间执行左连接,然后将value乘以percentage。关键部分是我们将coalescepercentage乘以1。因此,如果percentage为null,则value乘以1。

from pyspark.sql.functions import coalesce

my_query = """
select
  i.key,
  -- coalese the percentage with 1. If percentage is null then it gets replaced by 1
  i.value * coalesce(r.percentage,1) as result
from info i
left join reference r
  on i.key = r.key
"""

final_df = spark.sql(my_query)
final_df.show()

输出:

+---+------+
|key|result|
+---+------+
|  a|   1.0|
|  b|  10.0|
|  c|  50.0|
|  d|  40.0|
+---+------+

相关问题