寻找一种连续处理写入hdfs的文件的方法

goucqfw6  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(318)

我正在寻找一种可以:
监视hdfs dir中的新文件,并在它们出现时进行处理。
它还应该处理作业/应用程序开始工作之前目录中的文件。
它应该有检查点从它离开的地方继续,以防重新启动。
我看了apachespark:它可以读取新添加的文件,并可以处理重新启动以从原来的位置继续。我找不到一种方法让它同时处理同一作业范围内的旧文件(所以只有1和3)。
我看了ApacheFlink:它确实处理新旧文件。但是,一旦作业重新启动,它将再次开始处理所有这些作业(1和2)。
这是一个非常常见的用例。我在spark/flink中遗漏了什么使之成为可能的东西吗?这里还有别的工具可以用吗?

cbwuti44

cbwuti441#

我建议您稍微修改一下文件摄取并合并kafka,以便每次在hdfs中放入新文件时,都将消息放入kafka队列。然后使用spark streaming从队列读取文件名,然后从hdfs读取文件并进行处理。
勾选是一个真正的痛苦,也不能保证你想要什么。Kafka与Spark将能够保证准确一次。
Flume有一个源头,你也可以看看。

doinxwow

doinxwow2#

最好的方法是维护状态机。维护包含所有已处理文件的表或文件。
启动时的应用程序读取文件列表,并在set/map中维护相同的文件列表。任何已处理的新文件/旧文件都可以根据相同的文件进行查看和验证。
此外,摄取文件夹还需要维护文件的某些状态。与已处理的文件一样,一些ext.failed文件将被重命名为failed文件夹,被拒绝为拒绝文件夹。等
你可以用spark/flink做所有这些。。技术不是这里的瓶颈

yvfmudvl

yvfmudvl3#

使用flink streaming,您可以完全按照建议处理目录中的文件,当您重新启动时,它将从停止的位置开始处理。这称为连续文件处理。
你唯一要做的是1)为你的作业启用检查点,2)用以下方法启动你的程序:

Time period = Time.minutes(10)
    env.readFile(inputFormat, "hdfs:// … /logs",
                 PROCESS_CONTINUOUSLY, 
                 period.toMilliseconds, 
                 FilePathFilter.createDefaultFilter())

这个特性是相当新的,在dev邮件列表中有一个关于如何进一步改进其功能的活跃讨论。
希望这有帮助!

相关问题