我有三个文件:
spark\u mock\u dependency.py提供 user()
方法读取/etc/user,
spark\u mock.py创建一个env类,它使用 user()
方法获取谁是用户。
用于env类的单元测试的spark\u mock\u test.py。
我没有 /etc/user
所以我需要模仿这个方法 user()
假装。但是,单元测试 test_env_without_spark
工作但是 test_env_with_spark
没有。看起来模拟只在驱动节点上起作用,我不能在所有工作节点(进程)上模拟类或方法。请参阅我的代码和下面的错误。
有人知道如何在所有worker节点(进程)上模拟方法吗?
spark\u mock\u依赖.py
def user():
with open('/etc/user') as f:
return f.readline().strip()
Spark塞
from pkgname.spark_mock_dependency import user
class Env:
user = user()
Spark模拟试验.py
import unittest
from unittest.mock import patch
from pyspark import SparkConf, SparkContext
class EnvTest(unittest.TestCase):
sc = None
@classmethod
def setUpClass(cls) -> None:
conf = SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = SparkContext(conf=conf)
@patch('pkgname.spark_mock_dependency.user')
def test_env_with_spark(self, user_mocker):
user_mocker.return_value = 'anyone'
from pkgname.spark_mock import Env
rdd = self.__class__.sc.parallelize([1, 2])
results = rdd.map(lambda: f'{Env.user}').collect()
self.assertTrue([res == 'anyone' for res in results])
@patch('pkgname.spark_mock_dependency.hb_user')
def test_env_without_spark(self, user_mocker):
user_mocker.return_value = 'anyone'
from pkgname.spark_mock import Env
self.assertEqual('anyone', Env.user)
错误消息
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
"lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 4, in <module>
class Env:
"lib/python3.6/site-packages/pkgname/spark_mock.py", line 5, in Env
user = user()
"lib/python3.6/site-packages/pkgname/spark_mock_dependency.py", line 2, in hb_user
with open('/etc/user') as f:
FileNotFoundError: [Errno 2] No such file or directory: '/etc/user'
1条答案
按热度按时间hfyxw5xn1#
您可以在单元测试中导入用户包并模拟该包本身
检查以下代码: