我正在使用spark streaming通过创建streamingcontext从twitter获取推文,如下所示: val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
创建twitter流为: val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其保存为文本文件 tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
问题是tweets被保存为基于批处理时间的文件夹,但是我需要在同一个文件夹中保存每个批的所有数据。
有什么解决办法吗?
谢谢
1条答案
按热度按时间carvr3hs1#
我们可以使用sparksql新的Dataframe保存api来实现这一点,该api允许附加到现有的输出。默认情况下,saveastextfile无法保存到包含现有数据的目录(请参阅https://spark.apache.org/docs/latest/sql-programming-guide.html#save-模式)。https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-sql操作包括如何设置sparksql上下文以用于sparkstreaming。
假设使用sqlcontextsingleton从指南中复制该部分,则生成的代码如下所示:
(注意上面的示例使用json保存结果,但是您也可以使用不同的输出格式)。