这些转换函数中的一些尊重mock,另一些不尊重,我不知道为什么。
这里有一个名为etl_job.py
的文件,它包含各种转换函数,这些函数使用rdd.map
将一列添加到DataFrame
中,并使用从dependencies.utils
导入的get_random_bool
函数。
# etl_job.py
from dependencies.utils import get_random_bool
def transform_data1(df):
return df.rdd.map(lambda row: row+(get_random_bool(),)).toDF()
def transform_data2(df):
g = lambda row: row+(get_random_bool(),)
return df.rdd.map(lambda row: g(row)).toDF()
def transform_data3(df):
g = lambda row: row+(get_random_bool(),)
h = lambda row: g(row)
return df.rdd.map(lambda row: h(row)).toDF()
def transform_data4(df):
return df.rdd.map(lambda row: f(row)).toDF()
def transform_data5(df):
g = lambda row: f(row)
return df.rdd.map(lambda row: g(row)).toDF()
def f(row):
return row+(get_random_bool(),)
字符串
此测试文件尝试修补jobs.etl_job.py
中导入的get_random_bool
函数。
from pyspark.sql import SparkSession
from unittest.mock import patch
from jobs.etl_job import transform_data1, transform_data2, transform_data3, transform_data4, transform_data5
# Create SparkSession
spark = SparkSession.builder \
.master('local[1]') \
.appName('Time Tests') \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = spark.createDataFrame([["1"],["2"]])
print('original dataframe')
df.show()
with patch('jobs.etl_job.get_random_bool') as f:
f.return_value = 'notabool'
df_t = transform_data1(df)
print('with transform_data1')
df_t.show()
df_t = transform_data2(df)
print('with transform_data2')
df_t.show()
df_t = transform_data3(df)
print('with transform_data3')
df_t.show()
df_t = transform_data4(df)
print('with transform_data4')
df_t.show()
df_t = transform_data5(df)
print('with transform_data5')
df_t.show()
型
这是输出。
original dataframe
+---+
| _1|
+---+
| 1|
| 2|
+---+
with transform_data1
+---+--------+
| _1| _2|
+---+--------+
| 1|notabool|
| 2|notabool|
+---+--------+
with transform_data2
+---+--------+
| _1| _2|
+---+--------+
| 1|notabool|
| 2|notabool|
+---+--------+
with transform_data3
+---+--------+
| _1| _2|
+---+--------+
| 1|notabool|
| 2|notabool|
+---+--------+
with transform_data4
+---+----+
| _1| _2|
+---+----+
| 1|true|
| 2|true|
+---+----+
with transform_data5
+---+-----+
| _1| _2|
+---+-----+
| 1|false|
| 2|false|
+---+-----+
型
前三个转换工作正常--添加所有值都是notabool
的模拟列--但第四个和第五个转换不正常--它们添加了一列布尔值而不是模拟值。如果模拟函数在transform函数内调用,或者如果模拟函数由transform函数的内部函数调用,则它工作正常;但如果transform函数调用调用被模拟函数的外部函数,则使用实际的非模拟函数。
有人能解释这种行为吗?
1条答案
按热度按时间uemypmqf1#
我知道这是一个老问题,但这个答案可能会帮助其他遇到同样问题的人。
我花了很多时间对这个问题感到困惑。事实证明,由于某种原因,mocking不能很好地与Spark并行线程一起工作。记住,
map()
函数中的lambda在executor上运行,而不是驱动程序,即在不同的线程/进程上。因此mocking必须在executor线程的上下文中发生,而不是驱动程序线程。我让它正常工作的唯一方法是将lambda Package 在一个可以模拟的函数中,然后在测试中,用一个执行修补并调用原始函数的函数来模拟该函数(这应该发生在工作线程的上下文中)。
字符串
测试文件:
型