未创建Docker项目测试中的pytest临时文件夹

xxhby3vn  于 2023-05-16  发布在  Docker
关注(0)|答案(1)|浏览(104)

错误是:
属性错误:“function”对象没有属性“_get_object_id”

  • test_functions.py* 中的相关代码为:
import urllib.request as urllib
import os
import pandas as pd
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession
from src.functions import save, write, query
import pytest

global spark

spark = SparkSession.builder \
    .master("local") \
    .appName("load_parquet") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.2.5.jar") \
    .getOrCreate()

@pytest.fixture(scope="session")
def cache_dir(tmp_path_factory):
    return tmp_path_factory.mktemp("files") / "test.parquet"

def test_save(url='https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet', filename=cache_dir):
    print(cache_dir)
    assert save(url, filename), "Error saving file"

def test_write(write_method='psycopg2'):

    df = spark.read.parquet(cache_dir)
    if 'filename' not in df.columns:
        df = df.withColumn('filename', psf.lit('20-03'))

.....................

我做错了什么?我花了很多时间想让这件事成功。
我已经尝试了几个谷歌搜索的页面和我的不同方法,使这项工作的价值。没有一个。我希望在第一个上传递Assert,如果第二个可以读取文件,它也会通过。
代码以如下结尾:

#%% Run tests
if __name__ == "__main__":
    print(cache_dir)
    test_save()
    test_write()
    test_query()
    test_percentile()
    print("Everything passed")
# %%

在Magus回复后,测试保存不工作,只有 test_write 有问题:

def test_write(cache_dir):
    write_method = 'psycopg2'
    url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
    save(url, cache_dir)

    df = spark.read.parquet(cache_dir)
    if 'filename' not in df.columns:
        df = df.withColumn('filename', psf.lit('20-03'))
2023-05-03 13:42:13 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2023-05-03 13:42:13
2023-05-03 13:42:13 parameter = PosixPath('/tmp/pytest-of-root/pytest-0/files0/test.parquet')
2023-05-03 13:42:13 python_proxy_pool = <py4j.java_gateway.PythonProxyPool object at 0x7ff238c2b580>
2023-05-03 13:42:13
2023-05-03 13:42:13     def get_command_part(parameter, python_proxy_pool=None):
2023-05-03 13:42:13         """Converts a Python object into a string representation respecting the
2023-05-03 13:42:13         Py4J protocol.
2023-05-03 13:42:13
2023-05-03 13:42:13         For example, the integer `1` is converted to `u"i1"`
2023-05-03 13:42:13
2023-05-03 13:42:13         :param parameter: the object to convert
2023-05-03 13:42:13         :rtype: the string representing the command part
2023-05-03 13:42:13         """
2023-05-03 13:42:13         command_part = ""
2023-05-03 13:42:13
2023-05-03 13:42:13         if parameter is None:
2023-05-03 13:42:13             command_part = NULL_TYPE
2023-05-03 13:42:13         elif isinstance(parameter, bool):
2023-05-03 13:42:13             command_part = BOOLEAN_TYPE + smart_decode(parameter)
2023-05-03 13:42:13         elif isinstance(parameter, Decimal):
2023-05-03 13:42:13             command_part = DECIMAL_TYPE + smart_decode(parameter)
2023-05-03 13:42:13         elif isinstance(parameter, int) and parameter <= JAVA_MAX_INT\
2023-05-03 13:42:13                 and parameter >= JAVA_MIN_INT:
2023-05-03 13:42:13             command_part = INTEGER_TYPE + smart_decode(parameter)
2023-05-03 13:42:13         elif isinstance(parameter, long) or isinstance(parameter, int):
2023-05-03 13:42:13             command_part = LONG_TYPE + smart_decode(parameter)
2023-05-03 13:42:13         elif isinstance(parameter, float):
2023-05-03 13:42:13             command_part = DOUBLE_TYPE + encode_float(parameter)
2023-05-03 13:42:13         elif isbytearray(parameter):
2023-05-03 13:42:13             command_part = BYTES_TYPE + encode_bytearray(parameter)
2023-05-03 13:42:13         elif ispython3bytestr(parameter):
2023-05-03 13:42:13             command_part = BYTES_TYPE + encode_bytearray(parameter)
2023-05-03 13:42:13         elif isinstance(parameter, basestring):
2023-05-03 13:42:13             command_part = STRING_TYPE + escape_new_line(parameter)
2023-05-03 13:42:13         elif is_python_proxy(parameter):
2023-05-03 13:42:13             command_part = PYTHON_PROXY_TYPE + python_proxy_pool.put(parameter)
2023-05-03 13:42:13             for interface in parameter.Java.implements:
2023-05-03 13:42:13                 command_part += ";" + interface
2023-05-03 13:42:13         else:
2023-05-03 13:42:13 >           command_part = REFERENCE_TYPE + parameter._get_object_id()
2023-05-03 13:42:13 E           AttributeError: 'PosixPath' object has no attribute '_get_object_id'
2023-05-03 13:42:13
2023-05-03 13:42:13 /usr/local/lib/python3.10/dist-packages/py4j/protocol.py:298: AttributeError
2023-05-03 13:42:13 ----------------------------- Captured stdout call -----------------------------

经过一些测试,似乎在spark使用临时文件夹的配置中存在一些问题。切换到Pandas读取文件夹中的文件工作。

xe55xuns

xe55xuns1#

以下是你做错的地方:

  • 临时文件夹是短暂的,在使用它的进程退出后会被清理。您需要使用pytest的setup/teardown或 yield 保持 mktemp 上下文打开
  • fixture应该在test参数中声明为依赖项。
  • 测试之间不应相互依赖。测试应该是无状态的,或者通过使用fixture创建所需的状态。在您的例子中,您应该有一个fixture来创建test_read试图访问的文件,并且不依赖于另一个测试的结果。

对tmp_path fixture使用 yield

@pytest.fixture(scope="session")
def cache_dir(tmp_path_factory):
   #creates a context
   with tmp_path_factory.mktemp("files") as f
       #yields the file path, holds the context open
       yield f / "test.parquet"

def test_save(cache_dir):
    url='https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet'
    print(cache_dir)
    assert save(url,cache_dir), "Error saving file"

相关问题