scala——用apachespark解析大型结构化文件的最佳方法

czfnxgou  于 2021-06-27  发布在  Hive
关注(0)|答案(2)|浏览(357)

我有一个巨大的文本文件(以gbs为单位),每行都有计划文本数据,需要解析并提取到一个结构中进行进一步处理。每一行都有200个字符长的文本,我有一个正则表达式来解析每一行并将其分成不同的组,这些组稍后将保存到一个平面列数据中
数据样本

1759387ACD06JAN1910MAR191234567ACRT

正则表达式

(.{7})(.{3})(.{7})(.{7})(.{7})(.{4})

数据结构

Customer ID, Code, From Date, To Date, TrasactionId, Product code
1759387,     ACD,  06JAN19,   10MAR19,  1234567,     ACRT

请建议一个最佳的方法来解析这个巨大的数据并推送到内存网格,当调用相应的api时,spark jobs将再次使用这个网格进行进一步的处理。

yvgpqqbh

yvgpqqbh1#

您需要告诉spark要读取哪个文件,以及在读取时如何处理内容。
举个例子:

val numberOfPartitions = 5 // this needs to be optimized based on the size of the file and the available resources (e.g. memory)
val someObjectsRDD: RDD[SomeObject] =
        sparkContext.textFile("/path/to/your/file", numberOfPartitions)
            .mapPartitions( 
                { stringsFromFileIterator =>
                  stringsFromFileIterator.map(stringFromFile => //here process the raw string and return the result)
                }
                , preservesPartitioning = true
              )

在代码段中,someobject是一个具有问题的数据结构的对象

rjjhvcjd

rjjhvcjd2#

您可以使用df方法。使用-copyfromlocal命令将串行文件复制到hdfs,并使用下面的代码解析每个记录
我假设gireesh.txt中的示例记录如下

1759387ACD06JAN1910MAR191234567ACRT
2759387ACD08JAN1910MAY191234567ACRY
3759387ACD03JAN1910FEB191234567ACRZ

Spark代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.Encoders._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object Gireesh {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder().appName("Operations..").master("local[*]").getOrCreate()
    import spark.implicits._
    val pat="""(.{7})(.{3})(.{7})(.{7})(.{7})(.{4})""".r
    val headers = List("custid","code","fdate","tdate","tranid","prdcode")
    val rdd = spark.sparkContext.textFile("in/gireesh.txt")
      .map( x => {
              val y = scala.collection.mutable.ArrayBuffer[String]()
              pat.findAllIn(x).matchData.foreach( m=> y.appendAll(m.subgroups))
           (y(0).toLong,y(1),y(2),y(3),y(4).toLong,y(5))
          }
      )
    val df = rdd.toDF(headers:_*)
    df.printSchema()
    df.show(false)

  }
}

给出以下结果。

root
 |-- custid: long (nullable = false)
 |-- code: string (nullable = true)
 |-- fdate: string (nullable = true)
 |-- tdate: string (nullable = true)
 |-- tranid: long (nullable = false)
 |-- prdcode: string (nullable = true)

+-------+----+-------+-------+-------+-------+
|custid |code|fdate  |tdate  |tranid |prdcode|
+-------+----+-------+-------+-------+-------+
|1759387|ACD |06JAN19|10MAR19|1234567|ACRT   |
|2759387|ACD |08JAN19|10MAY19|1234567|ACRY   |
|3759387|ACD |03JAN19|10FEB19|1234567|ACRZ   |
+-------+----+-------+-------+-------+-------+

编辑1:
您可以在一个单独的函数中使用Map“transformation”,如下所示。

def parse(record:String) = {
  val y = scala.collection.mutable.ArrayBuffer[String]()
  pat.findAllIn(record).matchData.foreach( m=> y.appendAll(m.subgroups))
  (y(0).toLong,y(1),y(2),y(3),y(4).toLong,y(5))
}
val rdd = spark.sparkContext.textFile("in/gireesh.txt")
  .map( x =>  parse(x) )
val df = rdd.toDF(headers:_*)
df.printSchema()

相关问题