dataframe—如何迭代文件并对其执行操作—scala spark

jyztefdp  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(256)

我正在从一个目录中一个接一个地读取1000个.eml文件(消息/电子邮件文件),并使用javax.mailapi对它们进行解析和提取值,最后将它们存储到一个Dataframe中。示例代码如下:

var x = Seq[DataFrame]()

val emlFiles = getListOfFiles("tmp/sample")
val fileCount = emlFiles.length
val fs = FileSystem.get(sc.hadoopConfiguration)

for (i <- 0 until fileCount){
    var emlData = spark.emptyDataFrame
    val f = new File(emlFiles(i))
    val fileName = f.getName()
    val path = Paths.get(emlFiles(i))
    val session = Session.getInstance(new Properties())
    val messageIn = new FileInputStream(path.toFile())
    val mimeJournal = new MimeMessage(session, messageIn)
    // Extracting Metadata
    val Receivers = mimeJournal.getHeader("From")(0)
    val Senders = mimeJournal.getHeader("To")(0)
    val Date = mimeJournal.getHeader("Date")(0)
    val Subject = mimeJournal.getHeader("Subject")(0)
    val Size = mimeJournal.getSize
    emlData =Seq((fileName,Receivers,Senders,Date,Subject,Size)).toDF("fileName","Receivers","Senders","Date","Subject","Size")
    x = emlData +: x
}

问题是我使用for循环来做同样的事情,这需要花费很多时间。有没有办法打破for循环并读取文件?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题