我使用ELK(ElasticSearch,kibana,logstash,filebeat)来收集日志。我有一个日志文件,有以下几行,每行都有一个json,我的目标是使用Logstash Grok来取出json中的键/值对,并将其转发给ElasticSearch。
2018-03-28 13:23:01 charge:{"oldbalance":5000,"managefee":0,"afterbalance":"5001","cardid":"123456789","txamt":1}
2018-03-28 13:23:01 manage:{"cuurentValue":5000,"payment":0,"newbalance":"5001","posid":"123456789","something":"new2","additionalFields":1}
我正在使用Grok Debugger制作正则表达式模式并查看结果。我当前的正则表达式是:
%{TIMESTAMP_ISO8601} %{SPACE} %{WORD:$:data}:{%{QUOTEDSTRING:key1}:%{BASE10NUM:value1}[,}]%{QUOTEDSTRING:key2}:%{BASE10NUM:value2}[,}]%{QUOTEDSTRING:key3}:%{QUOTEDSTRING:value3}[,}]%{QUOTEDSTRING:key4}:%{QUOTEDSTRING:value4}[,}]%{QUOTEDSTRING:key5}:%{BASE10NUM:value5}[,}]
可以看出,这是硬编码的,因为真实的log中json中的键可以是任意的字,值可以是整型、双精度型或字符串型,而且键的长度是可变的。所以我的解决方案是不可接受的。我的解决方案结果如下,仅供参考。我使用的是Grok patterns。
我的问题是,尝试在json中提取键是否明智,因为ElasticSearch也使用json?其次,如果我尝试从json中提取键/值,是否存在正确、简洁的Grok模式?
Grok模式当前结果在解析上述行中的第一行时给予以下输出。
{
"TIMESTAMP_ISO8601": [
[
"2018-03-28 13:23:01"
]
],
"YEAR": [
[
"2018"
]
],
"MONTHNUM": [
[
"03"
]
],
"MONTHDAY": [
[
"28"
]
],
"HOUR": [
[
"13",
null
]
],
"MINUTE": [
[
"23",
null
]
],
"SECOND": [
[
"01"
]
],
"ISO8601_TIMEZONE": [
[
null
]
],
"SPACE": [
[
""
]
],
"WORD": [
[
"charge"
]
],
"key1": [
[
""oldbalance""
]
],
"value1": [
[
"5000"
]
],
"key2": [
[
""managefee""
]
],
"value2": [
[
"0"
]
],
"key3": [
[
""afterbalance""
]
],
"value3": [
[
""5001""
]
],
"key4": [
[
""cardid""
]
],
"value4": [
[
""123456789""
]
],
"key5": [
[
""txamt""
]
],
"value5": [
[
"1"
]
]
}
第二次编辑
是否可以使用Logstash的Json过滤器?但在我的情况下,Json是行/事件的一部分,而不是整个事件是Json。
第三版
我没有看到更新的解决方案函数很好地解析json。我的正则表达式如下:
filter {
grok {
match => {
"message" => [
"%{TIMESTAMP_ISO8601}%{SPACE}%{GREEDYDATA:json_data}"
]
}
}
}
filter {
json{
source => "json_data"
target => "parsed_json"
}
}
它没有key:value对,而是msg+json字符串。未解析已解析的json。
测试数据如下:
2018-03-28 13:23:01 manage:{"cuurentValue":5000,"payment":0,"newbalance":"5001","posid":"123456789","something":"new2","additionalFields":1}
2018-03-28 13:23:03 payment:{"cuurentValue":5001,"reload":0,"newbalance":"5002","posid":"987654321","something":"new3","additionalFields":2}
2018-03-28 13:24:07 management:{"cuurentValue":5002,"payment":0,"newbalance":"5001","posid":"123456789","something":"new2","additionalFields":1}
[2018-06-04T15:01:30,017][WARN ][logstash.filters.json ] Error parsing json {:source=>"json_data", :raw=>"manage:{\"cuurentValue\":5000,\"payment\":0,\"newbalance\":\"5001\",\"posid\":\"123456789\",\"something\":\"new2\",\"additionalFields\":1}", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'manage': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"manage:{"cuurentValue":5000,"payment":0,"newbalance":"5001","posid":"123456789","something":"new2","additionalFields":1}"; line: 1, column: 8]>}
[2018-06-04T15:01:30,017][WARN ][logstash.filters.json ] Error parsing json {:source=>"json_data", :raw=>"payment:{\"cuurentValue\":5001,\"reload\":0,\"newbalance\":\"5002\",\"posid\":\"987654321\",\"something\":\"new3\",\"additionalFields\":2}", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'payment': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"payment:{"cuurentValue":5001,"reload":0,"newbalance":"5002","posid":"987654321","something":"new3","additionalFields":2}"; line: 1, column: 9]>}
[2018-06-04T15:01:34,986][WARN ][logstash.filters.json ] Error parsing json {:source=>"json_data", :raw=>"management:{\"cuurentValue\":5002,\"payment\":0,\"newbalance\":\"5001\",\"posid\":\"123456789\",\"something\":\"new2\",\"additionalFields\":1}", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'management': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"management:{"cuurentValue":5002,"payment":0,"newbalance":"5001","posid":"123456789","something":"new2","additionalFields":1}"; line: 1, column: 12]>}
请检查结果:
1条答案
按热度按时间xkrw2x1b1#
可以使用
GREEDYDATA
将整个json块分配给一个单独的字段,这将为json数据创建一个单独的文件,
然后在
json_data
字段上应用json filter,如下所示: