pyspark变压器单元测试时出现picklingerror

fquxozlt  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(361)

我正在尝试单元测试pyspark transformer类。这个特定的转换器称为第三方库。第三方库返回一个对象,我希望它Map到dataframe中的一个新列(作为结构)。
在本例中,我模拟了第三方库,并通过构造函数将依赖关系注入到我的转换器中。
这是我的单元测试(注: self.spark 包含此测试的sparksession):

def test_transform_expect_new_column(self):

        # create the mock object
        my_mock_object = Mock(spec=MyObject)

        # mock the 3rd party library
        library = Mock(spec=ThirdPartyLibrary)
        library.do_something.return_value = my_mock_object

        # build a dataframe for testing
        input_df = self.spark.createDataFrame(
                [
                    ("input value")
                ],
                ['input_col']
            )

        # transform the dataframe
        transformer = MyTransformer(library) \
            .setInputCol("input_col") \
            .setOutputCol("my_output_col")
        result_df = transformer.transform(input_df)

当这个单元测试执行时,我得到以下错误,这似乎是由于我的模拟对象属于错误的类造成的。有没有办法加强这个测试来避免这个错误?
谢谢!

======================================================================
ERROR: test_transform_expect_new_column (my_transformers.tests.test_transformers.MyTransformerTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/user/my_transformers/my_transformers/tests/test_transformers.py", line 43, in test_transform_expect_new_column
    result_df = transformer.transform(input_df)
  File "/home/user/my_transformers/my_transformers/transformer.py", line 160, in transform
    col(self.getInputCol())))
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/udf.py", line 167, in __call__
    judf = self._judf
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/udf.py", line 151, in _judf
    self._judf_placeholder = self._create_judf()
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/udf.py", line 160, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "/usr/local/lib/python3.7/site-packages/pyspark/sql/udf.py", line 35, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "/usr/local/lib/python3.7/site-packages/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "/usr/local/lib/python3.7/site-packages/pyspark/serializers.py", line 590, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/local/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.7/site-packages/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/usr/lib64/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/usr/lib64/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/usr/lib64/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib64/python3.7/pickle.py", line 631, in save_reduce
    "args[0] from __newobj__ args has the wrong class")
_pickle.PicklingError: args[0] from __newobj__ args has the wrong class

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题