我正在写一个程序来保存时间序列数据 kafka
进入hadoop。我把目录结构设计成这样:
event_data
|-2016
|-01
|-data01
|-data02
|-data03
|-2017
|-01
|-data01
由于是守护进程任务,我编写了一个基于lru的管理器来管理打开的文件并及时关闭非活动文件,以避免资源泄漏,但是收入数据流不是按时间排序的,重新打开已有的文件来添加新数据是很常见的。
我试过用 FileSystem#append()
方法打开 OutputStream
当文件存在时,但它在我的hdfs集群上运行时出错(对不起,我不能在这里提供具体的错误,因为它是几个月前的,现在我尝试了另一个解决方案)。
然后,我使用另一种方法来实现我的目标:当存在同名文件时,为文件名添加一个序列后缀。现在我的hdfs中有很多文件。它看起来很脏。
我的问题是:在这种情况下,最好的做法是什么?
1条答案
按热度按时间gzszwxb41#
很抱歉,这不是一个直接的答案,你的编程问题,但如果你是开放的所有选项,而不是自己实现它,我想与你分享我们的经验与fluentd和它的hdfs(webhdfs)输出插件。
fluentd是一个开源的、可插入的数据采集器,您可以通过它轻松地构建数据管道,它将从输入读取数据,处理数据,然后将数据写入指定的输出,在您的场景中,输入是
kafka
输出为HDFS
. 你需要做的是:配置fluentd
input
在fluentdkafka插件之后,您将配置source
放弃你的Kafka/主题信息启用
webhdfs
以及append
操作为您的hdfs集群,您可以找到如何做到以下hdfs(webhdfs)输出插件配置您的
match
将数据写入的部分HDFS
,插件文档页面上有一个例子。对于按月份和日期划分数据,可以配置path
带有时间片占位符的参数,类似于:path "/event_data/%Y/%m/data%d"
使用此选项来收集数据,然后可以编写mapreduce作业来执行etl或任何您喜欢的操作。我不知道这是否适合你的问题,只是在这里提供一个更多的选择。