如何编写logstash过滤器来过滤kafka主题

2nbm6dog  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(741)

我想用kafka作为输入,logstash作为输出。我将饲料几个主题到logstash,并希望根据主题过滤。我试着这样写代码:

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["test", "payment"]
      }
}

filter {
    if [topic] = "test" {
       //do something
    } else {
       //do something
    }
}

但似乎不管用。

k5ifujac

k5ifujac1#

你应该加上 decorate_events 添加 kafka 现场。
选项将kafka元数据(如主题、消息大小)添加到事件中。这将向包含以下属性的logstash事件添加一个名为kafka的字段:topic:此消息与使用者组关联的主题:用于在此事件分区中读取的使用者组:此消息与offset关联的分区:与此消息与key关联的分区的偏移量:bytebuffer包含消息键
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-输入-Kafka-卡努事件
然后像这样更新conf

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["test", "payment"]
  }
}

filter {
  if [kafka][topic] = "test" {
   //do something
  } else {
   //do something
  }
}
yvfmudvl

yvfmudvl2#

所以以前的两个答案都是正确的,但都是不正确的。
1添加 decorate_events :

input {
        kafka {
            bootstrap_servers => "localhost:9092"
            topics => ["test", "payment"]
            decorate_events => true
        }
    }

寻找 [kafka][topic] 不是为了 [@metadata][kafka][topic] :

filter {
      if [kafka][topic] = "test" {
       //do something
      } else {
       //do something
      }
    }
yshpjwxd

yshpjwxd3#

通过添加更改输入部件 decorate_events 添加 kafka 现场。

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["test", "payment"]
        decorate_events => true
    }
}

更换过滤器部件如下:

filter {
    if [@metadata][kafka][topic] == "test" {
        //do something
    } else {
        //do something
  }
}

相关问题