我使用一个fixture来创建一个spark会话,该会话将在一个文件中的各种pytest测试之间共享,如下所示
'''
@pytest.fixture(scope="module")
def spark() -> SparkSession:
builder = SparkSession.builder \
.appName('we_pipeline_test') \
.config('spark.ui.showConsoleProgress', 'false') \
.config('spark.sql.debug.maxToStringFields', '200')
if 'SPARK_MASTER' in os.environ:
builder = builder.master(os.environ['SPARK_MASTER'])
else:
builder = builder.master('local[1]')
s = builder.getOrCreate()
with tempfile.TemporaryDirectory("we_pipeline") as dir:
s.sparkContext.setCheckpointDir(dir)
yield s
s.stop()
def test_func1(spark):
....
....
df = spark.createDataFrame(data,Schema)
....
.....
'''
字符串
我需要在另一个测试函数中访问这个dataframe df
'''
def test_func2(spark):
df.show()
'''
型
A如何才能做到这一点?
2条答案
按热度按时间bf1o4zei1#
你就快成功了。
你需要的是创建你的dataframe作为一个夹具太.在pytest中,测试函数是独立的,这意味着一个测试不应该依赖于例如。另一个测试的结果。
但是,如果您有一个操作数据的公共流程,并且在几个测试函数之间共享,那么您可以将其放入一个fixture中。所有的测试数据框准备都应该在这里完成。
字符串
然后,您可以在测试函数中使用fixture_data_frame。因为你可能要使用spark功能,所以你也需要spark SparkSession:
型
编辑:OP评论说,他希望在test_func1中发生的操作在test_func2中可用,而不是作为一个单独的fixture。虽然我认为设计明智的这是错误的,这是一种方法,你可以接近它。您可以通过将第一个测试也转换为一个fixture并在第二个测试中引用它来实现。
型
同样重要的是要澄清,将测试转换为fixture有点违背了常见的测试设计原则,其中测试应该是独立的,并且应该能够以任何顺序运行。
au9on6nz2#
我通过将所有测试函数 Package 在TestClass中并将dataframe创建为类变量来解决它。