我使用的是spark 2.4.1版本和Java8。
我有这样的场景:
将提供属性文件中要处理的分类器列表。
这些分类器决定了要提取和处理的数据。
如下所示:
val classifiers = Seq("classifierOne","classifierTwo","classifierThree");
for( classifier : classifiers ){
// read from CassandraDB table
val acutalData = spark.read(.....).where(<classifier conditition>)
// the data varies depend on the classifier passed in
// this data has many fields along with fieldOne, fieldTwo and fieldThree
取决于分类器,我需要过滤数据。目前我正在做以下工作:
if(classifier.===("classifierOne")) {
val classifierOneDs = acutalData.filter(col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()));
writeToParquet(classifierOneDs);
} else if(classifier.===("classifierTwo")) {
val classifierTwoDs = acutalData.filter(col("classifierTwo").notEqual(lit("")).or(col("classifierTwo").isNotNull()));
writeToParquet(classifierOneDs);
} else (classifier.===("classifierThree")) {
val classifierThreeDs = acutalData.filter(col("classifierThree").notEqual(lit("")).or(col("classifierThree").isNotNull()));
writeToParquet(classifierOneDs);
}
有没有办法避免 if
- else
在这堵?有没有其他方法可以做到这一点?
2条答案
按热度按时间jucafojl1#
您的问题似乎更多地是关于如何构造应用程序,而不是spark本身。实际上有两部分。
有没有办法避开这里的if-else障碍?
“避免”?在什么意义上?spark无法神奇地“发现”您的分布式处理方式。你应该帮点忙。
对于这种情况,我建议使用一个包含所有可能的过滤条件及其名称的查找表,例如。
为了使用它,你只需遍历所有的分类器(或者根据需要查找尽可能多的分类器)。
queries.par.foreach(writeToParquet)
oug3syen2#
所以,您需要根据分类器名称选择要检查的列,该列将作为列表传递?
在遍历列表时,无论如何都要遍历所有分类器。如果列名可以和实际的分类器名不同,则可以将其设置为
List[Classifier]
,在哪里Classifier
有点像case class Classifier(colName: String, classifierName: String)