使用json字段值作为带有filebeat的kafka主题的记录键

bybem2ql  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(1246)

我在文件中有一个事件(json消息),需要通过filebeat发送给kafka。json消息如下所示:

{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}

我想把这个信息发给Kafka。partion键应该是json事件消息中的应用程序字段。如何在json消息中提供自定义应用程序字段作为kafka记录的partion键?
filebeat.yml如下:

…
        output.kafka:
          version: 0.10.1
          hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
          topic: '%{[log_topic]}'
          codec.format:
            string: '%{[message]}'
          key: '%{[message.application]:default}'
          partition.hash:
            hash: []
            random: true # if false non-hashable events will be dropped
          required_acks: 1
          compression: none

https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf 根据这个引用,我们可以使用string spec格式引用事件字段值。
使用此配置时,默认消息“default”始终报告为key。如何配置filebeat.yml以提取自定义应用程序字段并将此信息用作kafka分区密钥?
此外,我尝试在输入部分定义一个字段,如下所示:

type: log
  enabled: true
  paths:
  - /var/logs/*event.log
  fields:
    log_topic: "event"
    application: '%{[application]} string'
  fields_under_root: true

相应的Kafka输出为:

output.kafka:
  version: 0.10.1
  hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
  topic: '%{[log_topic]}'
  codec.format:
    string: '%{[message]}'
  key: '%{[application]:default}'

  partition.hash:
    hash: []
    random: true # if false non-hashable events will be dropped
  required_acks: 1

但是kafka分区键总是:%{[application]}string

lhcgjxsq

lhcgjxsq1#

假设json真的被解析了,我想你应该

key: '%{[fields.application]:default}'

参见示例-https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic-Kafka选项
您可能还对添加主机元数据感兴趣-https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html
解码json-https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html

相关问题