对于一个类项目,我需要一个spark java程序作为kafka使用者进行侦听,并将kafka主题接收到的所有消息写入一个文件(例如“/user/zaydh/my\u text\u file.txt”)。
我可以作为一个 JavaPairReceiverInputDStream
对象;我也可以把它转换成 JavaDStream<String>
(这来自spark kafka的例子)。
但是,我找不到一个好的java语法来将这些数据写入一个基本上是单个日志文件的文件中。我试过用 foreachRDD
上 JavaDStream
对象,但我找不到一个干净的、并行的安全方法将其放入单个日志文件。
我理解这种方法是非传统或非理想的,但它是一种要求。非常感谢您的指导。
1条答案
按热度按时间xpcnnkqh1#
当你想到一个流时,你必须把它看作是一个不会停止提供数据的东西。因此,如果spark streaming有办法将所有的rdd保存到一个文件中,它将继续增长到一个巨大的大小(而且流不应该停止,还记得吗?:)
但在这种情况下,您可以使用rdd的saveastextfile实用程序,它将根据创建流式处理上下文时指定的批处理间隔在输出目录中创建许多文件
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))
然后可以使用how-to-merge-all-text-files-in-a-directory-into-one之类的方法将这些文件部分合并到一个文件中