我正在处理一个问题,但我找不到任何答案来解决它,在flume文档中也没有。我想获取尾随文件的绝对路径并保存它。之后,我需要将它作为密钥传递给kafka sink,以便在同一分区中具有相同路径的所有事件。我读过很多文章,说这是可能的,但我找不到配置分配,以使其工作。有人能给我一些关于如何配置代理的参考或例子吗?
我有以下代理配置:
source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100
Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10
秋明:)
1条答案
按热度按时间zaq34kh61#
最后,我找到了如何配置代理,以便使用数据源中文件的绝对路径在kafka主题中为分区分配密钥。更详细地说,需要设置属性“fileheaderkey=key”。这样,当事件被传递给kafka sink时,头包含pairs key=absolute/path/of/the/文件,kafka可以将其用作其消息中的键。