我有这个功能:
# spark already defined somewhere as:
spark = SparkSession.builder.appName("App").getOrCreate()
def read_data(spark):
query = "SELECT * FROM table"
pandas_df = pd.read_sql(query, conn)
return spark.createDataFrame(pandas_df)
字符串
要测试它:
from unittest import mock
@mock.patch("pandas.read_sql")
@mock.patch("pyspark.sql.SparkSession", autospec=True)
def test_read_data(spark_session, pandas_read_sql):
result = read_data(spark_session)
assert == ???
型
我应该用什么方法来测试这是有意义的?任何帮助都很感激。
1条答案
按热度按时间8yoxcaq71#
为了测试你的函数,你只需要模拟
pandas.read_sql
,spark_session
不能被模拟,你需要有一个示例来正确测试你的函数。你可以创建你自己的pytest.fixture
来满足这个要求。字符串
你可以做更多的Assert,检查创建的框架是否有正确的模式,是否包含你期望的值。
提示:您应该查看spark sql的功能,因为您可能不需要使用pandas来查询数据库。