flume-kafkachannel跨分区平衡消息

u3r8eeie  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(369)

我设置了一个flume代理,它使用spooldir命令从目录中读取csv文件。
我正在使用通道类型kafkachannel,以便将这些消息推送到带有10个分区的kafka主题上,稍后可以由spark应用程序进行处理。
我遇到的问题是,每个文件都被写入Kafka主题的一个分区。有些文件比其他文件大得多,这意味着消息在主题的分区中分布得非常不均匀。这使得为我的spark执行器分配正确数量的资源变得异常困难,因为有些执行器最终完成了所有繁重的工作,而另一些执行器则坐在那里等待将一些日志添加到它们的分区中。
有没有办法在flume中配置kafkachannel来平衡主题分区中的消息,或者限制一次发送的消息数量,从而将负载分散到所有可用分区中?
我在flume中使用了以下配置选项,但没有成功:

a1.channels.kafkaChannel.capacity = 100
a1.channels.kafkaChannel.transactionCapacity = 100
a1.channels.kafkaChannel.batch.size = 100

kafkachannel源代码经过了细微的修改以满足我的需要,但此处指定的默认配置选项仍然可用:http://flume.apache.org/flumeuserguide.html#kafka-渠道
完整配置文件(我已删除主机名和其他密钥信息)

a1.sources = src
a1.channels = kafkaChannel

a1.sources.src.type = spooldir
a1.sources.src.channels = kafkaChannel
a1.sources.src.spoolDir = /data/silk/flume/V5
a1.sources.src.fileHeader = true
a1.sources.src.trackerDir = .flumespool
a1.sources.src.consumeOrder = oldest
a1.sources.src.deletePolicy = immediate
a1.sources.src.decodeErrorPolicy = REPLACE
a1.sources.src.pollDelay = 12000

a1.channels.kafkaChannel.type = com.example.flume.channel.kafka.KafkaChannel
a1.channels.kafkaChannel.brokerList = host1:9092,host2:9092,host3:9092
a1.channels.kafkaChannel.topic = TEST-TOPIC
a1.channels.kafkaChannel.capacity = 100
a1.channels.c1.transactionCapacity = 100
a1.channels.kafkaChannel.zookeeperConnect = host1:2181,host2:2181,host3:2181
a1.channels.kafkaChannel.parseAsFlumeEvent = false

感谢您的帮助,谢谢!

zazmityj

zazmityj1#

对于其他面临此问题的人,我找到了一个解决方法:
通过实现memorychannel和kafkasink,而不是将日志直接推到kafkachannel上,我可以看到消息在kafka主题的分区中得到了更均匀的平衡。

相关问题