我正在为spark代码编写单元测试用例,该代码从hdfs文件和spark的目录读/写数据。为此,我创建了一个单独的trait,它提供minidfs集群的初始化,我将生成的hdfs uri值用于- spark.sql.warehouse.dir
创建sparksession对象时。这是密码-
trait TestSparkSession extends BeforeAndAfterAll {
self: Suite =>
var hdfsCluster: MiniDFSCluster = _
def nameNodeURI: String = s"hdfs://localhost:${hdfsCluster.getNameNodePort}/"
def withLocalSparkSession(tests: SparkSession => Any): Any = {
val baseDir = new File(PathUtils.getTestDir(getClass), "miniHDFS")
val conf = new HdfsConfiguration()
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath)
val builder = new MiniDFSCluster.Builder(conf)
hdfsCluster = builder.nameNodePort(9000)
.manageNameDfsDirs(true)
.manageDataDfsDirs(true)
.format(true)
.build()
hdfsCluster.waitClusterUp()
val testSpark = SparkSession
.builder()
.master("local")
.appName("Test App")
.config("spark.sql.warehouse.dir", s"${nameNodeURI}spark-warehouse/")
.getOrCreate()
tests(testSpark)
}
def stopHdfs(): Unit = hdfsCluster.shutdown(true, true)
override def afterAll(): Unit = stopHdfs()
}
在编写测试时,我继承了这个特性,然后编写如下的测试用例-
class SampleSpec extends FunSuite with TestSparkSession {
withLocalSparkSession {
testSpark =>
import testSpark.implicits._
// Test 1 Here
// Test 2 Here
}
}
每次运行一个测试类时,一切都正常。但是当你一下子把它们都跑的时候 java.net.BindException: Address already in use
. 这应该意味着在执行下一组测试时,已经创建的hdfscluster还没有关闭。这就是为什么它无法创建另一个绑定到同一端口的端口。但后来()我停止了hfdscluster。
我的问题是我是否可以共享hdfs集群和spark会话的单个示例,而不是每次都初始化它?我试图提取出方法之外的初始化,但它仍然引发相同的异常。即使我不能共享它,我怎样才能正确地停止我的集群并在下一个测试类执行时重新初始化它?
另外,请让我知道我编写使用sparksession和hdfs存储的“单元”测试用例的方法是否正确。
任何帮助都将不胜感激。
1条答案
按热度按时间daolsyd01#
我通过在companion对象中创建hdfs集群来解决这个问题,这样它就为所有测试套件创建了一个hdfs集群示例。