spark scala grep实用程序

mwg9r5ms  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(479)

我有大约700个avro文件,每个大约2gb。我必须从这些avro文件中grep一个unix id,比如'129384755',并且需要获得找到这个id的avro文件名。
由于unix grep需要花费大量时间,因此我尝试为它写下sparkscala代码,下面是我的代码。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkGrep {
    def main(args: Array[String]) {
        if (args.length < 3) {
            System.err.println("Usage: SparkGrep <host> <input_file> <match_term>")
            System.exit(1)
        }
        val conf = new SparkConf().setAppName("SparkGrep").setMaster(args(0))
        val sc = new SparkContext(conf)
        val inputFile = sc.textFile(args(1), 2).cache()
        val matchTerm : String = args(2)
        val numMatches = inputFile.filter(line => line.contains(matchTerm)).count()
        println("%s lines in %s contain %s".format(numMatches, args(1), matchTerm))
        System.exit(0)
    }
}

但我遇到的问题是
我在700个文件上运行了这段代码,参数(2)为///.avro。我希望///part-123.avro中的4行输出包含我的id“129384755”。但与此相反,我得到的输出是“4行in///.avro contain”,我将如何知道这700个文件中的文件名,其中我的搜索变量(它是唯一的,并且只在700个文件中的一个文件中)是唯一的。你能帮帮我吗。从你这一方面来说真的很有帮助。

ndh0cuux

ndh0cuux1#

你的问题有很多问题。你把它的措辞弄得很混乱。下次请做得更好。
“我在700个文件上运行了这段代码”:我假设这意味着args(1)是一个带有通配符的文件的目录路径
在spark中有多种方法可以做到这一点。如果你想坚持使用RDD,文件就必须很小。可能有1000多行。这就是方法,下面是我的spark shell输出:

scala> val matchTerm = ".txt"
matchTerm: String = .txt

scala> val fileNames = "/testFolder/*". //path to the directory where the files are
fileNames: String = /testFolder/*

scala>  val inputFile = sc.wholeTextFiles(fileNames)
inputFile: org.apache.spark.rdd.RDD[(String, String)] = /Users/vipulrajan/Downloads/testFolder/* MapPartitionsRDD[28] at wholeTextFiles at <console>:26

scala> val fileCounts = inputFile.map{ case (a,b) => { val lines = b.split("\n"); (a, lines.filter(_.contains(matchTerm)).length) }}
fileCounts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:27

scala> fileCounts.foreach(x => println("file %s contains %s lines with the term %s".format(x._1, x._2, matchTerm)))

输出如下:

file /testFolder/testFile.txt contains 10 lines with the term .txt
file /testFolder/testFile2.txt contains 5 lines with the term .txt

不过,我建议您改用Dataframe:

scala> val fileNames = "/testFolder/*"
fileNames: String = /testFolder/*

scala> val matchTerm = ".txt"
matchTerm: String = .txt

scala> val df = spark.read.text(fileNames).withColumn("inputFile", input_file_name)
df: org.apache.spark.sql.DataFrame = [value: string, inputFile: string]

scala> df.filter( col("value").contains(matchTerm)).groupBy("inputFile").count.show(false)

输出如下所示:

+--------------------------+-----+
|inputFile                 |count|
+--------------------------+-----+
|/testFolder/testFile.txt  |10   |
|/testFolder/testFile2.txt |5    |
+--------------------------+-----+

testfile.txt和testfile2.txt这两个文件都在/testfoler目录下
以下是这些文件的内容:
testfile.txt文件

fileNosomeb233136.txt
fileNosomeb7559.txt
fileNosomeb340542.txt
fileNosomeb457514.txt
fileNosomeb491638.txt
fileNosomeb27417.txt
fileNosomeb310232.txt
ssamefileNo74582.txt
fileNosomeb77367.txt
fileNosomeb407264.txt

testfile2.txt文件

fileNosomeb233136.txt
fileNosomeb7559.txt
fileNosomeb340542.txt
fileNosomeb457514.txt
fileNosomeb491638.txt

相关问题