我是spark的新手,目前正在解决一个与在上下文时间之后将spark流的结果保存到文件相关的问题。所以问题是:我希望一个查询运行60秒,并将它在这段时间内读取的所有输入保存到一个文件中,并且能够定义文件名以供将来处理。
起初,我认为下面的代码是可行的:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
rdd.saveAsTextFile("./test");
});
然而,在运行之后,我意识到它不仅为每次读取的输入保存了不同的文件-(想象一下,我在该端口以随机的速度生成随机数),因此,如果在一秒钟内读取1,文件将包含1个数字,但如果读取更多,文件将包含它们,我不是只写一个文件,其中包含60年代的所有值,而是无法命名该文件,因为saveastextfile中的参数是所需的目录。
所以我想问一下是否有spark本机解决方案,这样我就不必用“java技巧”来解决它了,比如:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
List<String> l = rdd.collect();
for(String voto: l)
out.println(voto + " "+dtf.format(LocalDateTime.now()));
out.close();
});
我搜索了类似问题的spark文档,但找不到解决方案:/thanks for your time:)
1条答案
按热度按时间uttx8gqw1#
下面是使用新的sparkapi使用socket流数据的模板。
对于代码解释,请阅读代码内联注解
java版本