如何将spark流数据保存到文件

vawmfj5a  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(694)

我是spark的新手,目前正在解决一个与在上下文时间之后将spark流的结果保存到文件相关的问题。所以问题是:我希望一个查询运行60秒,并将它在这段时间内读取的所有输入保存到一个文件中,并且能够定义文件名以供将来处理。
起初,我认为下面的代码是可行的:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    rdd.saveAsTextFile("./test");
                });

然而,在运行之后,我意识到它不仅为每次读取的输入保存了不同的文件-(想象一下,我在该端口以随机的速度生成随机数),因此,如果在一秒钟内读取1,文件将包含1个数字,但如果读取更多,文件将包含它们,我不是只写一个文件,其中包含60年代的所有值,而是无法命名该文件,因为saveastextfile中的参数是所需的目录。
所以我想问一下是否有spark本机解决方案,这样我就不必用“java技巧”来解决它了,比如:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
                    List<String> l = rdd.collect();
                    for(String voto: l)
                        out.println(voto + "    "+dtf.format(LocalDateTime.now()));
                    out.close();
                });

我搜索了类似问题的spark文档,但找不到解决方案:/thanks for your time:)

uttx8gqw

uttx8gqw1#

下面是使用新的sparkapi使用socket流数据的模板。

import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object ReadSocket {

  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess

    //Start reading from socket
    val dfStream = spark.readStream
      .format("socket")
      .option("host","127.0.0.1") // Replace it your socket host
      .option("port","9090")
      .load()

    dfStream.writeStream
      .trigger(Trigger.ProcessingTime("1 minute")) // Will trigger 1 minute
      .outputMode(OutputMode.Append) // Batch will processed for the data arrived in last 1 minute
      .foreachBatch((ds,id) => { //
        ds.foreach(row => { // Iterdate your data set
          //Put around your File generation logic
          println(row.getString(0)) // Thats your record
        })
      }).start().awaitTermination()
  }

}

对于代码解释,请阅读代码内联注解
java版本

import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

public class ReadSocketJ {

    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = Constant.getSparkSess();

        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "127.0.0.1") // Replace it your socket host
                .option("port", "9090")
                .load();

        lines.writeStream()
                .trigger(Trigger.ProcessingTime("5 seconds"))
                .foreachBatch((VoidFunction2<Dataset<Row>, Long>) (v1, v2) -> {
                    v1.as(Encoders.STRING())
                            .collectAsList().forEach(System.out::println);
                }).start().awaitTermination();

    }
}

相关问题