如何在spark dataframe中按列或按行验证大型csv文件

r1wp621o  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(379)

我有一个10gb或更大的数据文件,有150列,其中我们需要用不同的规则验证每个数据(datatype/format/null/domain value/primary key..),最后创建两个输出文件一个有成功数据,另一个有错误详细信息的错误数据。我们需要移动错误文件中的行,如果任何列在第一时间有错误,则无需进一步验证。
我正在spark数据框中读取一个文件,我们是按列还是按行验证它,通过哪种方式获得最佳性能?

8ftvxx2r

8ftvxx2r1#

回答你的问题
我在sparkDataframe中读取一个文件,我们是按列还是按行验证它,通过哪种方式获得最佳性能?
dataframe是一个分布式数据集合,它被组织为分布在集群中的一组行,spark中定义的大部分转换都应用于处理row对象的行。

Psuedo code
 import spark.implicits._
  val schema = spark.read.csv(ip).schema

  spark.read.textFile(inputFile).map(row => {
      val errorInfo : Seq[(Row,String,Boolean)] = Seq()
      val data = schema.foreach(f => {
        // f.dataType //get field type and have custom logic on field type
        // f.name // get field name i.e., column name
        // val fieldValue = row.getAs(f.name) //get field value and have check's on field value on field type
        // if any error in field value validation then populate @errorInfo info object i.e (row,"error_info",false)
        // otherwise i.e (row,"",true)
      })
      data.filter(x => x._3).write.save(correctLoc)
      data.filter(x => !x._3).write.save(errorLoc)
    })

相关问题