transactions_df
是我正在运行UDF的DF,在UDF内部,我正在引用另一个DF,以便根据某些条件从中获取值。
def convertRate(row):
completed = row["completedAt"]
currency = row["currency"]
amount = row["amount"]
if currency == "MXN":
rate = currency_exchange_df.select("rate").where((transactions_df.to =="MXN") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
amount = amount/rate
final_rate = currency_exchange_df.select("rate").where((transactions_df.to =="CAD") & (completed>=col("effectiveAt")) & (completed< col("effectiveTill")))
converted = amount*final_rate
return converted
convertUDF = f.udf(lambda row: convertRate(row), DoubleType())
为了调用UDF,我将Row作为一个结构体传递。
temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
temp.show()
我得到如下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File ~\AppData\Local\Programs\Python\Python310\lib\site- packages\pyspark\serializers.py:437, in CloudPickleSerializer.dumps(self, obj)
436 try:
--> 437 return cloudpickle.dumps(obj, pickle_protocol)
438 except pickle.PickleError:
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:72, in dumps(obj, protocol, buffer_callback)
69 cp = CloudPickler(
70 file, protocol=protocol, buffer_callback=buffer_callback
71 )
---> 72 cp.dump(obj)
73 return file.getvalue()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:540, in CloudPickler.dump(self, obj)
539 try:
--> 540 return Pickler.dump(self, obj)
541 except RuntimeError as e:
TypeError: cannot pickle '_thread.RLock' object
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
Input In [40], in <cell line: 1>()
----> 1 temp = transactions_df.withColumn("newAmount", convertUDF(f.struct([transactions_df[x] for x in transactions_df.columns])))
2 temp.show()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:199, in UserDefinedFunction._wrapped.<locals>.wrapper(*args)
197 @functools.wraps(self.func, assigned=assignments)
198 def wrapper(*args):
--> 199 return self(*args)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:177, in UserDefinedFunction.__call__(self, *cols)
176 def __call__(self, *cols):
--> 177 judf = self._judf
178 sc = SparkContext._active_spark_context
179 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:161, in UserDefinedFunction._judf(self)
154 @property
155 def _judf(self):
156 # It is possible that concurrent access, to newly created UDF,
157 # will initialize multiple UserDefinedPythonFunctions.
158 # This is unlikely, doesn't affect correctness,
159 # and should have a minimal performance impact.
160 if self._judf_placeholder is None:
--> 161 self._judf_placeholder = self._create_judf()
162 return self._judf_placeholder
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:170, in UserDefinedFunction._create_judf(self)
167 spark = SparkSession.builder.getOrCreate()
168 sc = spark.sparkContext
--> 170 wrapped_func = _wrap_function(sc, self.func, self.returnType)
171 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
172 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
173 self._name, wrapped_func, jdt, self.evalType, self.deterministic)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\udf.py:34, in _wrap_function(sc, func, returnType)
32 def _wrap_function(sc, func, returnType):
33 command = (func, returnType)
---> 34 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
35 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
36 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\rdd.py:2814, in _prepare_for_python_RDD(sc, command)
2811 def _prepare_for_python_RDD(sc, command):
2812 # the serialized command will be compressed by broadcast
2813 ser = CloudPickleSerializer()
-> 2814 pickled_command = ser.dumps(command)
2815 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
2816 # The broadcast will have same life cycle as created PythonRDD
2817 broadcast = sc.broadcast(pickled_command)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\serializers.py:447, in CloudPickleSerializer.dumps(self, obj)
445 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
446 print_exec(sys.stderr)
--> 447 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
样本DF如下:
第一个DF是我的transaction_df
,第二个DF包含汇率。
所提供的交易以结雅或墨西哥披索为单位,且货币兑换数据仅包含“effectiveAt”日期。假设汇率将保持不变,直到提供指定汇率的新记录为止。
我必须将所有交易转换为加元。注意,我们必须先将MXN转换为USD,然后将USD转换为加元。
第三个DF是预期解。
1条答案
按热度按时间ehxuflar1#
您的初始脚本应该执行转换。但在屏幕截图中,我们看到还执行了其他转换-分组、聚合、透视。所有这些对于一个问题来说似乎太多了。我这样做只是为了表明,仅使用 Dataframe 是真正可行的。代码可能看起来更大,但这样更易于管理和高效。
输入:
首先,对于
currency_exchange_df
,为范围中的每个可能日期创建一行。这是一个开销很小的操作,因为每个货币对每年最多只能有365行。然后,创建包含汇率的
df_rates
,该汇率可用于直接转换为CAD。为此,使用了自联接。最后,加入
transactions_df
,分组,透视,聚合。Spark中的Dataframe API变得越来越强大。这个脚本甚至不需要带 predicate 的高阶函数。如果你不能使用原生Spark功能完成任务,那么你可以求助于
pandas_udf
。使用常规的udf
是过去的遗物。它们效率很低,99%的情况下它们是完全可以避免的。