(logstash)仅索引来自kafka输入的elasticsearch中的特定数据

dgjrabp2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(347)

我使用Kafka作为输入,并将其放入elasticsearch(输出)

input {
    kafka {
        topics =>["maxwell"]
        codec => json
    }
}
filter {
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        index => 'test_kafka'
        document_type => "%{table}"
        hosts => 'localhost:9200'
    }
}

当它运行时,它输出以下json

{
  "database": "my_db",
  "xid": 88935,
  "@timestamp": "2016-11-14T12:00:13.763Z",
  "data": {
    "contact_country_code": null,
    "contact_type_id": 1,
    "created": "2014-10-03 12:24:36",
    "modified_by": null,
    "modified": "2014-10-03 12:24:36",
    "contact_id": 1,
    "is_default": 0,
    "created_by": null,
    "contact_number": "1241222232"
  },
  "old": {
    "contact_number": "1241222"
  },
  "commit": true,
  "@version": "1",
  "type": "update",
  "table": "contact",
  "ts": 1479124813
}

我的问题是,如何在elasticsearch中只提取动态文档类型的数据键来实现这一点

{
  "_index": "test_kafka",
  "_type": "contact",
  "_id": "AVhitY804rvpX8qdVt9d",
  "_score": 1,
  "_source": {
    "contact_country_code": null,
    "contact_type_id": 1,
    "created": "2014-10-03 12:24:36",
    "modified_by": null,
    "modified": "2014-10-03 12:24:36",
    "contact_id": 1,
    "is_default": 0,
    "created_by": null,
    "contact_number": "1241222232"
  }
}
ebdffaop

ebdffaop1#

您可以添加 ruby 过滤按摩您的事件如下。它所做的是首先保存 table 内部字段 @metadata 字段,以便在 elasticsearch 输出。然后删除除 data 一个。然后它复制 data 字段,最后删除 data 现场。

input {
    kafka {
        topics =>["maxwell"]
        codec => json
    }
}
filter {
  mutate {
     add_field => { "[@metadata][type]" => "%{table}" }
  }
  ruby {
     code => "
        # Ruby code for Logstash 2.x
        event.to_hash.delete_if {|k, v| k != 'data'}
        event.to_hash.update(event['data'].to_hash)
        event.to_hash.delete_if {|k, v| k == 'data'}

        # Ruby code for Logstash 5.x
        event.to_hash.delete_if {|k, v| k != 'data'}            
        event.to_hash.update(event.get('data').to_hash)
        event.to_hash.delete_if {|k, v| k == 'data'}
     "
  }        
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => 'localhost:9200'
        index => 'test_kafka'
        document_type => "%{[@metadata][type]}"
    }
}

相关问题