如何使单元测试模拟代码在每个spark worker节点(进程)上生效

czfnxgou  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(377)

我有三个文件:
spark\u mock\u dependency.py提供 user() 方法读取/etc/user,
spark\u mock.py创建一个env类,它使用 user() 方法获取谁是用户。
用于env类的单元测试的spark\u mock\u test.py。
我没有 /etc/user 所以我需要模仿这个方法 user() 假装。但是,单元测试 test_env_without_spark 工作但是 test_env_with_spark 没有。看起来模拟只在驱动节点上起作用,我不能在所有工作节点(进程)上模拟类或方法。请参阅我的代码和下面的错误。
有人知道如何在所有worker节点(进程)上模拟方法吗?
spark\u mock\u依赖.py

def user():
    with open('/etc/user') as f:
        return f.readline().strip()

Spark塞

from pkgname.spark_mock_dependency import user

class Env:
    user = user()

Spark模拟试验.py

import unittest
from unittest.mock import patch
from pyspark import SparkConf, SparkContext

class EnvTest(unittest.TestCase):
    sc = None

    @classmethod
    def setUpClass(cls) -> None:
        conf = SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = SparkContext(conf=conf)

    @patch('pkgname.spark_mock_dependency.user')
    def test_env_with_spark(self, user_mocker):
        user_mocker.return_value = 'anyone'

        from pkgname.spark_mock import Env
        rdd = self.__class__.sc.parallelize([1, 2])
        results = rdd.map(lambda: f'{Env.user}').collect()
        self.assertTrue([res == 'anyone' for res in results])

    @patch('pkgname.spark_mock_dependency.hb_user')
    def test_env_without_spark(self, user_mocker):
        user_mocker.return_value = 'anyone'
        from pkgname.spark_mock import Env
        self.assertEqual('anyone', Env.user)

错误消息

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 4, in <module>
    class Env:
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 5, in Env
    user = user()
"lib/python3.6/site-packages/pkgname/spark_mock_dependency.py", line 2, in hb_user
    with open('/etc/user') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/etc/user'
hfyxw5xn

hfyxw5xn1#

您可以在单元测试中导入用户包并模拟该包本身
检查以下代码:

def test_env_with_spark(self, user_mocker):

    from pkgname.spark_mock import Env, user
    user_mocker.patch("pkgname.spark_mock.user", return_value='anyone')
    rdd = self.__class__.sc.parallelize([1, 2])
    results = rdd.map(lambda: f'{Env.user}').collect()

相关问题