用布尔值标记sparkDataframe中的错误行

qnzebej0  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(394)

我在试着用spark数据框。利用来自级联框架的先前知识,该框架具有陷阱机制,可以将错误行(具有空值的行)过滤到称为陷阱的单独抽头中。那些不知道的人让我说清楚。当你从一个文本文件中得到一个错误的行时。框架要么从整个数据中剔除坏行,要么停止执行。现在在apachespark中,我观察到错误的行并没有妨碍执行。这很好,但当涉及到从数据中获得业务洞察力时,数据的质量确实很重要!
因此,我有一个文本文件,其中有一堆行(您可以随意选取任何数据集),其中很少有记录包含空值。现在我用spark.read.csv将文本文件加载到数据框中。现在,我要做的是分析dataframe并动态创建一个名为“ismyrowbad”的列,逻辑将一次分析每一行,如果逻辑发现有空值的行,它将该行上的ismyrowbad列标记为true,并将没有空值的列标记为true,对应的列ismyrowbad对于干净的purticular行应该有false。
为您提供传入和传出数据集的概述
传入Dataframe

  1. fname,lname,age
  2. will,smith,40
  3. Dwayne,Nunn,36
  4. Aniruddha,Sinha,
  5. Maria,,22

传出Dataframe

  1. fname,lname,age,isMyRowBad
  2. will,smith,40,false
  3. Dwayne,Nunn,36,false
  4. Aniruddha,Sinha,,true
  5. Maria,,22,true

上面的分类好行和坏行的方法可能看起来有点愚蠢,但它确实有意义,因为我不需要多次运行筛选操作。让我们看看,怎么样?
假设我有一个名为indf的dataframe作为inputdf,analyseddf:(dataframe,dataframe)作为output df tuple
现在,我尝试了这部分代码

  1. val analyzedDf: (DataFrame, DataFrame) = (inputDf.filter(_.anyNull),inputDf.filter(!_.anyNull))

此代码分隔好行和坏行。我同意!但由于过滤器运行了两次,这会降低性能,这意味着过滤器将在整个数据集上迭代两次如果您觉得在考虑50个字段和至少584000行(即250MB的数据)时,运行两次filter是有意义的,那么您可以反驳这一点
还有这个

  1. val analyzedDf: DataFrame = inputDf.select("*").withColumn("isMyRowBad", <this point, I am not able to analyze row>

上面的代码片段显示了我无法找出如何扫描整行并用布尔值将行标记为坏的地方。
希望你们都明白我的目标是什么。如果您在代码片段中发现语法错误,请忽略,因为我马上在此处键入了这些错误(以后编辑时会更正)
请给我一个提示(一个小的代码片段或伪代码就足够了)如何继续挑战。如果你不明白我要做什么,请联系我。
任何帮助都将不胜感激。提前谢谢!
p、 s:在bigdata/spark/hadoop/scala等网站上有很多优秀的人,请您纠正我可能(概念上)写错的任何一点
下面的代码顺便给了我一个解决方案。请看一看

  1. package aniruddha.data.quality
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  4. import org.apache.spark.sql.functions._
  5. /**
  6. * Created by aniruddha on 8/4/17.
  7. */
  8. object DataQualityCheck extends App {
  9. val spark = SparkSession.builder().master("local[*]").getOrCreate()
  10. import spark.implicits._
  11. val schema: StructType = StructType(List(
  12. StructField("fname", StringType, nullable = true),
  13. StructField("lname", StringType, nullable = true),
  14. StructField("age", IntegerType, nullable = true),
  15. StructField("pan", StringType, nullable = true),
  16. StructField("married", StringType, nullable = true)
  17. ))
  18. val inputDataFrame: DataFrame = spark
  19. .read
  20. .schema(schema)
  21. .option("header",true)
  22. .option("delimiter",",")
  23. .csv("inputData/infile")
  24. //inputDataFrame.show()
  25. val analysedDataFrame: DataFrame = inputDataFrame.select("*").withColumn("isRowBad", when($"pan".isNull||$"lname".isNull||$"married".isNull,true).otherwise(false))
  26. analysedDataFrame show
  27. }

输入

  1. fname,lname,age,pan,married
  2. aniruddha,sinha,23,0AA22,no
  3. balajee,venkatesh,23,0b96,no
  4. warren,shannon,72,,
  5. wes,borland,63,0b22,yes
  6. Rohan,,32,0a96,no
  7. james,bond,66,007,no

输出

  1. +---------+---------+---+-----+-------+--------+
  2. | fname| lname|age| pan|married|isRowBad|
  3. +---------+---------+---+-----+-------+--------+
  4. |aniruddha| sinha| 23|0AA22| no| false|
  5. | balajee|venkatesh| 23| 0b96| no| false|
  6. | warren| shannon| 72| null| null| true|
  7. | wes| borland| 63| 0b22| yes| false|
  8. | Rohan| null| 32| 0a96| no| true|
  9. | james| bond| 66| 007| no| false|
  10. +---------+---------+---+-----+-------+--------+

代码运行得很好,但我对when函数有问题。我们不能选择所有的列而不硬编码吗?

6gpjuf90

6gpjuf901#

据我所知,你不能这样做与内置的csv解析器。如果解析器遇到错误(failfast模式),您可以让它停止,但不能进行注解。
但是,您可以使用定制的csv解析器来实现这一点,它可以在一次传递中处理数据。除非我们想做一些聪明的类型自省,否则最简单的方法是创建一个helper类来注解文件的结构:

  1. case class CSVColumnDef(colPos: Int, colName: String, colType: String)
  2. val columns = List(CSVColumnDef(0,"fname","String"),CSVColumnDef(1,"lname","String"),CSVColumnDef(2,"age", "Int"))

接下来,我们需要一些函数来a)分割输入,b)从分割的数据中提取数据,c)检查行是否错误:

  1. import scala.util.Try
  2. def splitToSeq(delimiter: String) = udf[Seq[String],String](_.split(delimiter))
  3. def extractColumnStr(i: Int) = udf[Option[String],Seq[String]](s => Try(Some(s(i))).getOrElse(None))
  4. def extractColumnInt(i: Int) = udf[Option[Int],Seq[String]](s => Try(Some(s(i).toInt)).getOrElse(None))
  5. def isRowBad(delimiter: String) = udf[Boolean,String](s => {
  6. (s.split(delimiter).length != columns.length) || (s.split(delimiter).exists(_.length==0))
  7. })

要使用这些,我们首先需要读入文本文件(因为我没有文本文件,为了允许人们复制这个答案,我将创建一个rdd):

  1. val input = sc.parallelize(List(("will,smith,40"),("Dwayne,Nunn,36"),("Aniruddha,Sinha,"),("Maria,,22")))
  2. input.take(5).foreach(println)

有了这个输入,我们就可以创建一个带有单列的Dataframe,即原始行,并将拆分列添加到其中:

  1. val delimiter = ","
  2. val raw = "raw"
  3. val delimited = "delimited"
  4. val compDF = input.toDF(raw).withColumn(delimited, splitToSeq(delimiter)(col(raw)))

最后,我们可以提取前面定义的所有列,并检查行是否错误:

  1. val df = columns.foldLeft(compDF){case (acc,column) => column.colType match {
  2. case "Int" => acc.withColumn(column.colName, extractColumnInt(column.colPos)(col(delimited)))
  3. case _ => acc.withColumn(column.colName, extractColumnStr(column.colPos)(col(delimited)))
  4. }}.
  5. withColumn("isMyRowBad", isRowBad(delimiter)(col(raw))).
  6. drop(raw).drop(delimited)
  7. df.show
  8. df.printSchema

这个解决方案的优点是spark execution planner足够聪明,可以构建所有这些 .withColumn 单通道操作( map )在数据上,没有零洗牌。恼人的是,与使用漂亮闪亮的csv库相比,开发人员的工作量要大得多,我们需要以某种方式定义列。如果您想更聪明一点,可以从文件的第一行获取列名(提示: .mapPartitionsWithIndex ),并将所有内容解析为字符串。我们也不能定义一个case类来描述整个df,因为有太多的列以这种方式处理解决方案。希望这有帮助。。。

展开查看全部
e1xvtsh3

e1xvtsh32#

这可以使用udf来完成。尽管ben horsburgh给出的答案肯定是非常出色的,但是我们可以做到这一点,而不必深入研究Dataframe背后的内部架构。
下面的代码可以给你一个想法

  1. import org.apache.spark.sql.functions._
  2. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  3. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  4. /**
  5. * Created by vaijnath on 10/4/17.
  6. */
  7. object DataQualityCheck extends App {
  8. val spark = SparkSession.builder().master("local[*]").getOrCreate()
  9. import spark.implicits._
  10. val schema: StructType = StructType(List(
  11. StructField("fname", StringType, nullable = true),
  12. StructField("lname", StringType, nullable = true),
  13. StructField("married", StringType, nullable = true)
  14. ))
  15. val inputDataFrame: DataFrame = spark
  16. .read
  17. .schema(schema)
  18. .option("header",false)
  19. .option("delimiter",",")
  20. .csv("hydrograph.engine.spark/testData/inputFiles/delimitedInputFile.txt")
  21. //inputDataFrame.show()
  22. def isBad(row:Row):Boolean={
  23. row.anyNull
  24. }
  25. val simplefun=udf(isBad(_:Row))
  26. val cols=struct(inputDataFrame.schema.fieldNames.map(e=> col(e)):_*)
  27. // println(cols+"******************") //for debugging
  28. val analysedDataFrame: DataFrame = inputDataFrame.withColumn("isRowBad", simplefun(cols))
  29. analysedDataFrame.show
  30. }

如果你遇到任何问题,请给我回电。我相信这个解决方案是合适的,因为您似乎在寻找使用dataframe的代码。
谢谢。

展开查看全部

相关问题