pandas 如何测试此功能?

uidvcgyl  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(76)

我有这个功能:

# spark already defined somewhere as:
spark = SparkSession.builder.appName("App").getOrCreate()

def read_data(spark):
    query = "SELECT * FROM table"
    pandas_df = pd.read_sql(query, conn)
    return spark.createDataFrame(pandas_df)

字符串
要测试它:

from unittest import mock

@mock.patch("pandas.read_sql")
@mock.patch("pyspark.sql.SparkSession", autospec=True)
def test_read_data(spark_session, pandas_read_sql):
    result = read_data(spark_session)
    assert == ???


我应该用什么方法来测试这是有意义的?任何帮助都很感激。

8yoxcaq7

8yoxcaq71#

为了测试你的函数,你只需要模拟pandas.read_sqlspark_session不能被模拟,你需要有一个示例来正确测试你的函数。你可以创建你自己的pytest.fixture来满足这个要求。

from unittest.mock import patch

import pandas
import pyspark.sql
import pytest
from pyspark.sql import SparkSession

from your_module import read_data

@pytest.fixture
def spark_session():
    _spark_session = SparkSession.builder.appName("unit-tests").getOrCreate()
    yield _spark_session
    _spark_session.stop()

@patch("pandas.read_sql")
def test_read_data(mock_read_sql, spark_session):
    # given:
    mock_read_sql.return_value = pandas.DataFrame(
        [(1, "row1"), (2, "row2")], columns=["id", "column1"]
    )

    # when:
    spark_df = read_data(spark_session)

    # then:
    assert isinstance(spark_df, pyspark.sql.DataFrame)

字符串
你可以做更多的Assert,检查创建的框架是否有正确的模式,是否包含你期望的值。
提示:您应该查看spark sql的功能,因为您可能不需要使用pandas来查询数据库。

相关问题