spark流:将dstream批处理加入到单个输出文件夹中

xj3cbfub  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(301)

我正在使用spark streaming通过创建streamingcontext从twitter获取推文,如下所示: val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1)) 创建twitter流为: val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters) 然后将其保存为文本文件 tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/") 问题是tweets被保存为基于批处理时间的文件夹,但是我需要在同一个文件夹中保存每个批的所有数据。
有什么解决办法吗?
谢谢

carvr3hs

carvr3hs1#

我们可以使用sparksql新的Dataframe保存api来实现这一点,该api允许附加到现有的输出。默认情况下,saveastextfile无法保存到包含现有数据的目录(请参阅https://spark.apache.org/docs/latest/sql-programming-guide.html#save-模式)。https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-sql操作包括如何设置sparksql上下文以用于sparkstreaming。
假设使用sqlcontextsingleton从指南中复制该部分,则生成的代码如下所示:

data.foreachRDD{rdd =>
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  // Convert your data to a DataFrame, depends on the structure of your data
  val df = ....
  df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}

(注意上面的示例使用json保存结果,但是您也可以使用不同的输出格式)。

相关问题