我正在运行pyspark测试,使用pytest作为wordcount示例。我有三个文件:
文件1: conftest.py
:
import logging
import pytest
import pyspark
from pyspark import SparkConf, SparkContext
def quiet_py4j():
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_context(request):
conf = (SparkConf().setMaster("local[*]").setAppName("pytest-pyspark-testing"))
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
quiet_py4j()
return sc`
文件2: wordcount.py
:
def do_word_counts(lines):
counts = (lines.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x+y))
results = {word: count for word, count in counts.collect()}
return results
文件3: wordcount_test.py
:
import pytest
import wordcount
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
test_input = [' hello spark ',' hello again spark spark']
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
运行命令后 pytest wordcount_test.py
,发生以下错误:
request = <SubRequest 'spark_context' for <Function test_do_word_counts>>
@pytest.fixture(scope="session")
def spark_context(request):
conf = (SparkConf().setMaster("local[*]").setAppName("pytest-pyspark-testing"))
#request.addfinalizer(lambda: sc.stop())
> sc = SparkContext(conf=conf)
conftest.py:15:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../anaconda3/lib/python3.8/site-packages/pyspark/context.py:146: in __init__
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
../../anaconda3/lib/python3.8/site-packages/pyspark/context.py:226: in _do_init
str(self._jvm.PythonUtils.getPythonAuthSocketTimeout(self._jsc))
有什么想法可以纠正吗?谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!