用一段代码从scala spark读取两个不同的文件

pgccezyw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(369)

我有两组文件和模式,我想在一组代码中运行它们。
这是我的密码:

val file_path = "file1" // I want to pass two files (file1, file2)
val rdd = spark.sparkContext.wholeTextFiles(file_path)
val validJsonRdd = rdd.flatMap(_._2.replace(" ", "").replace("\n", "").replace(":value", ":\"value\"").replace("}{", "}\n{").split("\n"))
val dataframe = spark
      .read
      .option("multiLine", true)
      .schema(Schema1) // I want to put schema1 for file1 and schema2 for file2
      .json(validJsonRdd)
      .show()

因此,根据上面的代码,我想运行两个不同的模式及其相应的文件。

46scxncf

46scxncf1#

您可以将应用程序参数传递到spark submit中,如下所示。。
spark-submit的通用语法

./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      --deploy-mode <deploy-mode> \
      --conf <key>=<value> \
      ... # other options
      <application-jar> \
      [application-arguments]

您可以使用file.txtschema1.txt(或)file1.txtscehma2.txt来代替[application arguments]
file.txt是第一个参数,schema1.txt是第二个参数。
在应用程序代码中,可以

def main(args: Array[String]) : Unit = {
     val inputFile = args(0);
     val schemaFile = args(1);
     val schemaFileasString = // open FileInoutStream and read whole schema data from**schemaFile**as string 
     val schema = SchemaConverter.convertContent(schemaFileasString)
     //create spark session and provide all the parameter properly
    import spark.implicits._
    val rdd = spark.sparkContext.wholeTextFiles(inputFile)
    val validJsonRdd = rdd.flatMap(_._2.replace(" ", "").replace("\n", "").replace(":value", ":\"value\"").replace("}{", "}\n{").split("\n"))

     spark.read
      .option("multiLine", true)
      .schema(schema)
      .json(validJsonRdd)
      .show()
   }

参考文献:
https://github.com/zalando-incubator/spark-json-schema
https://spark.apache.org/docs/latest/submitting-applications.html

相关问题