我在文件中有一个事件(json消息),需要通过filebeat发送给kafka。json消息如下所示:
{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}
我想把这个信息发给Kafka。partion键应该是json事件消息中的应用程序字段。如何在json消息中提供自定义应用程序字段作为kafka记录的partion键?
filebeat.yml如下:
…
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[message.application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
compression: none
https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf 根据这个引用,我们可以使用string spec格式引用事件字段值。
使用此配置时,默认消息“default”始终报告为key。如何配置filebeat.yml以提取自定义应用程序字段并将此信息用作kafka分区密钥?
此外,我尝试在输入部分定义一个字段,如下所示:
type: log
enabled: true
paths:
- /var/logs/*event.log
fields:
log_topic: "event"
application: '%{[application]} string'
fields_under_root: true
相应的Kafka输出为:
output.kafka:
version: 0.10.1
hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
topic: '%{[log_topic]}'
codec.format:
string: '%{[message]}'
key: '%{[application]:default}'
partition.hash:
hash: []
random: true # if false non-hashable events will be dropped
required_acks: 1
但是kafka分区键总是:%{[application]}string
1条答案
按热度按时间lhcgjxsq1#
假设json真的被解析了,我想你应该
参见示例-https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic-Kafka选项
您可能还对添加主机元数据感兴趣-https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html
解码json-https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html