我使用Kafka作为输入,并将其放入elasticsearch(输出)
input {
kafka {
topics =>["maxwell"]
codec => json
}
}
filter {
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => 'test_kafka'
document_type => "%{table}"
hosts => 'localhost:9200'
}
}
当它运行时,它输出以下json
{
"database": "my_db",
"xid": 88935,
"@timestamp": "2016-11-14T12:00:13.763Z",
"data": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
},
"old": {
"contact_number": "1241222"
},
"commit": true,
"@version": "1",
"type": "update",
"table": "contact",
"ts": 1479124813
}
我的问题是,如何在elasticsearch中只提取动态文档类型的数据键来实现这一点
{
"_index": "test_kafka",
"_type": "contact",
"_id": "AVhitY804rvpX8qdVt9d",
"_score": 1,
"_source": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
}
}
1条答案
按热度按时间ebdffaop1#
您可以添加
ruby
过滤按摩您的事件如下。它所做的是首先保存table
内部字段@metadata
字段,以便在elasticsearch
输出。然后删除除data
一个。然后它复制data
字段,最后删除data
现场。