我想用kafka作为输入,logstash作为输出。我将饲料几个主题到logstash,并希望根据主题过滤。我试着这样写代码:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test", "payment"]
}
}
filter {
if [topic] = "test" {
//do something
} else {
//do something
}
}
但似乎不管用。
3条答案
按热度按时间k5ifujac1#
你应该加上
decorate_events
添加kafka
现场。选项将kafka元数据(如主题、消息大小)添加到事件中。这将向包含以下属性的logstash事件添加一个名为kafka的字段:topic:此消息与使用者组关联的主题:用于在此事件分区中读取的使用者组:此消息与offset关联的分区:与此消息与key关联的分区的偏移量:bytebuffer包含消息键
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-Kafka-卡努事件
然后像这样更新conf
yvfmudvl2#
所以以前的两个答案都是正确的,但都是不正确的。
1添加
decorate_events
:寻找
[kafka][topic]
不是为了[@metadata][kafka][topic]
:yshpjwxd3#
通过添加更改输入部件
decorate_events
添加kafka
现场。更换过滤器部件如下: