在分布式模式下使用hdfs连接器接收器避免来自kafka connect的小文件

5lhxktic  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(410)

我们有一个主题,消息速率为每秒1msg,有3个分区,我正在使用hdfs connector将数据以avro格式(默认)写入hdfs,它生成大小以kbs为单位的文件,因此我尝试在hdfs属性中更改以下属性。
“flush.size”:“5000”,“rotate.interval.ms”:“7200000”
但是输出仍然是小文件,所以我需要澄清以下几点来解决这个问题:
flush.size属性是强制性的吗?如果我们没有提到flus.size属性,数据是如何被刷新的?
如果我们提到flush size为5000,rotate interval为2小时,则在前3个时间间隔内,它每2小时刷新一次数据,但之后它会随机刷新数据,请查找文件创建的时间(19:14,21:14,23:15,01:15,06:59,08:59,12:40,14:40)--突出显示了不匹配的间隔。这是因为所提到的属性的过度使用吗?这就引出了第三个问题。
如果我们提到以下所有属性(flush.size、rotate.interval.ms、rotate.schedule.interval.ms),那么flush的首选项是什么
增加msg的速率和减少分区实际上是在显示正在刷新的数据的大小的增加,这是控制小文件的唯一方法吗?如果输入事件的速率是变化的并且不稳定,我们如何处理属性?
如果您能分享有关处理Kafka连接hdfs连接器的小文件的文档,那将是非常有帮助的,谢谢。

sh7euo9m

sh7euo9m1#

如果您使用的是基于时间的分区器,并且消息的时间戳不会一直增加,那么当单个writer任务看到间隔为的时间戳较小的消息时,它将转储文件 rotate.interval.ms 读取任何给定的记录。
如果您想拥有一致的每两小时分区窗口,那么您应该使用 rotate.interval.ms=-1 要禁用它 rotate.schedule.interval.ms 分区持续时间窗口内的某个合理数字。
e、 g.每2小时有7200条消息,不清楚每条消息有多大,但假设是1mb。然后,您将在一个缓冲区中保存约7gb的数据,并且需要调整连接堆的大小以保存这么多的数据。
呈现的顺序是
定时轮换,从一小时的最高峰开始
刷新大小或“基于消息”的时间轮换,以先发生的为准,或者有一条记录在当前批处理开始之前显示为“在”
我相信对于存储连接器来说,冲洗尺寸是必须的
总的来说,像uber的hudi或者camus sweeper以前的kafka hdfs工具这样的系统更适合处理小文件。连接sink任务只关心从kafka消费,并写入下游系统;该框架本身并不认为hadoop更喜欢较大的文件。

相关问题