如何在文件中的不同pytest测试函数之间共享spark dataframe

a11xaf1n  于 2023-08-06  发布在  Apache
关注(0)|答案(2)|浏览(123)

我使用一个fixture来创建一个spark会话,该会话将在一个文件中的各种pytest测试之间共享,如下所示
'''

@pytest.fixture(scope="module")
def spark() -> SparkSession:
    builder = SparkSession.builder \
        .appName('we_pipeline_test') \
        .config('spark.ui.showConsoleProgress', 'false') \
        .config('spark.sql.debug.maxToStringFields', '200')

    if 'SPARK_MASTER' in os.environ:
        builder = builder.master(os.environ['SPARK_MASTER'])
    else:
        builder = builder.master('local[1]')

    s = builder.getOrCreate()

    with tempfile.TemporaryDirectory("we_pipeline") as dir:
        s.sparkContext.setCheckpointDir(dir)
        yield s
        s.stop()
def test_func1(spark):
....
....
df = spark.createDataFrame(data,Schema)
....
.....
'''

字符串
我需要在另一个测试函数中访问这个dataframe df

'''
def test_func2(spark):
 df.show()
'''


A如何才能做到这一点?

bf1o4zei

bf1o4zei1#

你就快成功了。
你需要的是创建你的dataframe作为一个夹具太.在pytest中,测试函数是独立的,这意味着一个测试不应该依赖于例如。另一个测试的结果。
但是,如果您有一个操作数据的公共流程,并且在几个测试函数之间共享,那么您可以将其放入一个fixture中。所有的测试数据框准备都应该在这里完成。

@pytest.fixture(scope="module")
def fixture_data_frame(spark: SparkSession) -> DataFrame:
    data = # Your data here
    schema = # Your schema here
    df = spark.createDataFrame(data, schema)
    df = df.filter( ... ) # data manipulation

    yield df

字符串
然后,您可以在测试函数中使用fixture_data_frame。因为你可能要使用spark功能,所以你也需要spark SparkSession:

def test_func1(spark, fixture_data_frame):
    # Perform your tests here using the spark session and fixture_data_frame
    assert 1=1 # Perform your testing

def test_func2(spark, fixture_data_frame):
    # Perform other tests here 
    assert 1=1 # Perform your testing


编辑:OP评论说,他希望在test_func1中发生的操作在test_func2中可用,而不是作为一个单独的fixture。虽然我认为设计明智的这是错误的,这是一种方法,你可以接近它。您可以通过将第一个测试也转换为一个fixture并在第二个测试中引用它来实现。

@pytest.fixture(scope="module")
def test_func1(spark, fixture_data_frame):
    # Perform your manipulations here
    manipulated_df = fixture_data_frame.filter(...)  # example manipulation
    # Perform your tests here
    assert manipulated_df.count() > 0
    yield manipulated_df

def test_func2(spark, test_func1):
    # test_func1 is now both the manipulated data_frame and the test itself.
    # You can perform additional tests here


同样重要的是要澄清,将测试转换为fixture有点违背了常见的测试设计原则,其中测试应该是独立的,并且应该能够以任何顺序运行。

au9on6nz

au9on6nz2#

我通过将所有测试函数 Package 在TestClass中并将dataframe创建为类变量来解决它。

相关问题