选取错误:无法序列化对象:TypeError:无法在传递给UDF的行中选取“_thread.RLock”对象

kb5ga3dv  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(130)

transactions_df是我正在运行UDF的DF,在UDF内部,我正在引用另一个DF,以便根据某些条件从中获取值。

def convertRate(row):
    completed = row["completedAt"]
    currency = row["currency"]
    amount = row["amount"]
    if currency == "MXN":
        rate = currency_exchange_df.select("rate").where((transactions_df.to =="MXN") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
        amount = amount/rate
    final_rate = currency_exchange_df.select("rate").where((transactions_df.to =="CAD") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
    converted = amount*final_rate
    return converted

convertUDF = f.udf(lambda row: convertRate(row), DoubleType())

为了调用UDF,我将Row作为一个结构体传递。

temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
temp.show()

我得到如下错误:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~\AppData\Local\Programs\Python\Python310\lib\site-        packages\pyspark\serializers.py:437, in CloudPickleSerializer.dumps(self, obj)
    436 try:
--> 437     return cloudpickle.dumps(obj, pickle_protocol)
    438 except pickle.PickleError:

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:72, in dumps(obj, protocol, buffer_callback)
     69 cp = CloudPickler(
     70     file, protocol=protocol, buffer_callback=buffer_callback
     71 )
---> 72 cp.dump(obj)
     73 return file.getvalue()

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:540, in CloudPickler.dump(self, obj)
    539 try:
--> 540     return Pickler.dump(self, obj)
    541 except RuntimeError as e:

TypeError: cannot pickle '_thread.RLock' object

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
Input In [40], in <cell line: 1>()
----> 1 temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
  2 temp.show()

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:199, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
    197 @functools.wraps(self.func, assigned=assignments)
    198 def wrapper(*args):
--> 199     return self(*args)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:177, in UserDefinedFunction.__call__(self, *cols)
    176 def __call__(self, *cols):
--> 177     judf = self._judf
    178     sc = SparkContext._active_spark_context
    179     return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:161, in UserDefinedFunction._judf(self)
    154 @property
    155 def _judf(self):
    156     # It is possible that concurrent access, to newly created UDF,
    157     # will initialize multiple UserDefinedPythonFunctions.
    158     # This is unlikely, doesn't affect correctness,
    159     # and should have a minimal performance impact.
    160     if self._judf_placeholder is None:
--> 161         self._judf_placeholder = self._create_judf()
    162     return self._judf_placeholder

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:170, in UserDefinedFunction._create_judf(self)
    167 spark = SparkSession.builder.getOrCreate()
    168 sc = spark.sparkContext
--> 170 wrapped_func = _wrap_function(sc, self.func, self.returnType)
    171 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    172 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    173     self._name, wrapped_func, jdt, self.evalType, self.deterministic)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:34, in _wrap_function(sc, func, returnType)
     32 def _wrap_function(sc, func, returnType):
     33     command = (func, returnType)
---> 34     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     35     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
     36                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\rdd.py:2814, in _prepare_for_python_RDD(sc, command)
   2811 def _prepare_for_python_RDD(sc, command):
   2812     # the serialized command will be compressed by broadcast
   2813     ser = CloudPickleSerializer()
-> 2814     pickled_command = ser.dumps(command)
   2815     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   2816         # The broadcast will have same life cycle as created PythonRDD
   2817         broadcast = sc.broadcast(pickled_command)

File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\serializers.py:447, in CloudPickleSerializer.dumps(self, obj)
    445     msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    446 print_exec(sys.stderr)
--> 447 raise pickle.PicklingError(msg)

    PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

样本DF如下:
第一个DF是我的transaction_df,第二个DF包含汇率。
所提供的交易以结雅或墨西哥披索为单位,且货币兑换数据仅包含“effectiveAt”日期。假设汇率将保持不变,直到提供指定汇率的新记录为止。
我必须将所有交易转换为加元。注意,我们必须先将MXN转换为USD,然后将USD转换为加元。
第三个DF是预期解。

ehxuflar

ehxuflar1#

您的初始脚本应该执行转换。但在屏幕截图中,我们看到还执行了其他转换-分组、聚合、透视。所有这些对于一个问题来说似乎太多了。我这样做只是为了表明,仅使用 Dataframe 是真正可行的。代码可能看起来更大,但这样更易于管理和高效。
输入:

from pyspark.sql import functions as F, Window as W
transactions_df = spark.createDataFrame(
    [(7, '2021-10-01', 'USD', 30.0, 'DEBIT', '2021-10-01'),
     (9, '2021-10-02', 'USD', 10.0, 'DEBIT', '2021-10-02'),
     (6, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
     (2, '2021-10-03', 'USD', 29.99, 'CREDIT', '2021-10-03'),
     (1, '2021-10-04', 'USD', 29.99, 'CREDIT', '2021-10-04'),
     (4, '2021-10-04', 'USD', 49.99, 'CREDIT', '2021-10-04'),
     (8, '2021-10-05', 'USD', 9.99, 'DEBIT', '2021-10-05'),
     (3, '2021-10-06', 'MXN', 621.42, 'CREDIT', '2021-10-06'),
     (5, '2021-10-07', 'USD', 35.99, 'CREDIT', '2021-10-07')],
    ['id', 'completedAt', 'currency', 'amount', 'type', 'completedDate'])

currency_exchange_df = spark.createDataFrame(
    [(3, 'USD', 'MXN', 20.44, '2021-09-28', '2021-10-05'),
     (4, 'USD', 'CAD', 1.35, '2021-09-28', '2021-10-04'),
     (2, 'USD', 'CAD', 1.33, '2021-10-04', '9999-12-31'),
     (1, 'USD', 'MXN', 20.79, '2021-10-05', '9999-12-31')],
    ['id', 'from', 'to', 'rate', 'effectiveAt', 'effectiveTill'])

首先,对于currency_exchange_df,为范围中的每个可能日期创建一行。这是一个开销很小的操作,因为每个货币对每年最多只能有365行。

eff_at = F.to_date('effectiveAt')
max_date = F.lit(transactions_df.agg(F.max(F.to_date('completedAt'))).head()[0])
w = W.partitionBy('from', 'to').orderBy(eff_at)
currency_exchange_df = (currency_exchange_df
    .withColumn('effective', F.sequence(eff_at, F.coalesce(F.date_sub(F.lead(eff_at).over(w), 1), max_date)))
    .withColumn('effective', F.explode('effective'))
)

# +---+----+---+-----+-----------+-------------+----------+

# | id|from| to| rate|effectiveAt|effectiveTill| effective|

# +---+----+---+-----+-----------+-------------+----------+

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-28|

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-29|

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-09-30|

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-01|

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-02|

# |  4| USD|CAD| 1.35| 2021-09-28|   2021-10-04|2021-10-03|

# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-04|

# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-05|

# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-06|

# |  2| USD|CAD| 1.33| 2021-10-04|   9999-12-31|2021-10-07|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-28|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-29|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-09-30|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-01|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-02|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-03|

# |  3| USD|MXN|20.44| 2021-09-28|   2021-10-05|2021-10-04|

# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-05|

# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-06|

# |  1| USD|MXN|20.79| 2021-10-05|   9999-12-31|2021-10-07|

# +---+----+---+-----+-----------+-------------+----------+

然后,创建包含汇率的df_rates,该汇率可用于直接转换为CAD。为此,使用了自联接。

join_on = (F.col('a.effective') == F.col('b.effective')) & (F.col('a.to') != 'CAD')
df_rates = (currency_exchange_df.alias('a')
    .join(currency_exchange_df.filter("from = 'USD' and to = 'CAD'").alias('b'), join_on, 'left')
    .select(
        F.col('a.effective').alias('completedAt'),
        F.when(F.col('a.to') == 'CAD', 'USD').otherwise(F.col('a.to')).alias('currency'),
        F.coalesce(F.col('b.rate') / F.col('a.rate'), 'a.rate').alias('rate')
    )
)

# +-----------+--------+-------------------+

# |completedAt|currency|               rate|

# +-----------+--------+-------------------+

# | 2021-10-02|     USD|               1.35|

# | 2021-10-02|     MXN|0.06604696673189824|

# | 2021-09-30|     USD|               1.35|

# | 2021-09-30|     MXN|0.06604696673189824|

# | 2021-10-05|     USD|               1.33|

# | 2021-10-05|     MXN|0.06397306397306397|

# | 2021-09-28|     USD|               1.35|

# | 2021-09-28|     MXN|0.06604696673189824|

# | 2021-09-29|     USD|               1.35|

# | 2021-09-29|     MXN|0.06604696673189824|

# | 2021-10-03|     USD|               1.35|

# | 2021-10-03|     MXN|0.06604696673189824|

# | 2021-10-06|     USD|               1.33|

# | 2021-10-06|     MXN|0.06397306397306397|

# | 2021-10-04|     USD|               1.33|

# | 2021-10-04|     MXN|0.06506849315068493|

# | 2021-10-01|     USD|               1.35|

# | 2021-10-01|     MXN|0.06604696673189824|

# | 2021-10-07|     USD|               1.33|

# | 2021-10-07|     MXN|0.06397306397306397|

# +-----------+--------+-------------------+

最后,加入transactions_df,分组,透视,聚合。

df_converted = (transactions_df
    .join(df_rates, ['completedAt', 'currency'], 'left')
    .withColumn('types', F.concat(F.initcap('type'), F.lit('s')))
    .groupBy(F.col('completedAt').alias('date'))
    .pivot('types', ['Credits', 'Debits'])
    .agg(F.round(F.sum(F.col('amount') * F.col('rate')), 2))
    .fillna(0)
)
df_converted.sort('date').show()

# +----------+-------+------+

# |      date|Credits|Debits|

# +----------+-------+------+

# |2021-10-01|    0.0|  40.5|

# |2021-10-02|    0.0|  13.5|

# |2021-10-03|  80.97|   0.0|

# |2021-10-04| 106.37|   0.0|

# |2021-10-05|    0.0| 13.29|

# |2021-10-06|  39.75|   0.0|

# |2021-10-07|  47.87|   0.0|

# +----------+-------+------+

Spark中的Dataframe API变得越来越强大。这个脚本甚至不需要带 predicate 的高阶函数。如果你不能使用原生Spark功能完成任务,那么你可以求助于pandas_udf。使用常规的udf是过去的遗物。它们效率很低,99%的情况下它们是完全可以避免的。

相关问题