spark3与Pandas矢量自定义项的

llycmphe  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(467)

我正在研究在pyspark(v3)中使用pandas-udf。出于许多原因,我理解迭代和udf通常是不好的,我理解我在这里展示的简单示例可以使用sql函数来完成—所有这些都是多余的!
我一直遵循这个指南:https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
我有一个简单的例子从文件工作:

  1. import pandas as pd
  2. from typing import Iterator, Tuple
  3. from pyspark.sql import SparkSession
  4. from pyspark.sql.functions import col, pandas_udf
  5. spark = SparkSession.builder.getOrCreate()
  6. pdf = pd.DataFrame(([1, 2, 3], [4, 5, 6], [8, 9, 0]), columns=["x", "y", "z"])
  7. df = spark.createDataFrame(pdf)
  8. @pandas_udf('long')
  9. def test1(x: pd.Series, y: pd.Series) -> pd.Series:
  10. return x + y
  11. df.select(test1(col("x"), col("y"))).show()

这对于执行基本的算术很有效-如果我想加法、乘法等,这是直接的(但是在没有函数的pyspark中也很简单)。
我想对这些值进行比较,例如:

  1. @pandas_udf('long')
  2. def test2(x: pd.Series, y: pd.Series) -> pd.Series:
  3. return x if x > y else y
  4. df.select(test2(col("x"), col("y"))).show()

这将与 ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all(). . 我知道它是在计算序列而不是行值。
这里有一个迭代器的例子。同样,对于他们提供的基本算术示例,这也很好。但如果我尝试运用逻辑:

  1. @pandas_udf("long")
  2. def test3(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  3. for x, y in batch_iter:
  4. yield x if x > y else y
  5. df.select(test3(col("x"), col("y"))).show()

我得到了和以前一样的值错误。
所以我的问题是,我应该如何进行这样的逐行比较?在矢量化函数中有可能吗?如果没有,那么它们的用例是什么?

k3bvogb1

k3bvogb11#

我想出来了。在你把问题写下来并向全世界公布之后就这么简单了。
需要做的就是返回一个数组,然后转换为一个系列:

  1. @pandas_udf('long')
  2. def test4(x: pd.Series, y: pd.Series) -> pd.Series:
  3. return pd.Series([a if a > b else b for a, b in zip(x, y)])
  4. df.select(test4(col("x"),col("y"))).show()
7uzetpgm

7uzetpgm2#

我花了两天的时间来寻找这个答案,谢谢你西蒙·德莫瑞斯!
我需要一个稍微修改一下的例子。为了便于管理,我将单个udf分解为多个组件。下面是一个我用来给别人参考的例子:

  1. xdf = pd.DataFrame(([1, 2, 3,'Fixed'], [4, 5, 6,'Variable'], [8, 9, 0,'Adjustable']), columns=["x", "y", "z", "Description"])
  2. df = spark.createDataFrame(xdf)
  3. def fnRate(x):
  4. return pd.Series(['Fixed' if 'Fixed' in str(v) else 'Variable' if 'Variable' in str(v) else 'Other' for v in zip(x)])
  5. @pandas_udf('string')
  6. def fnRateRecommended(Description: pd.Series) -> pd.Series:
  7. varProduct = fnRate(Description)
  8. return varProduct
  9. # call function
  10. df.withColumn("Recommendation", fnRateRecommended(sf.col("Description"))).show()
展开查看全部

相关问题