logstash从不同类型的消息中提取数据

wnvonmuf  于 2021-06-13  发布在  ElasticSearch
关注(0)|答案(2)|浏览(503)

下面是我从自动化平台获得的日志类型的3个示例。我想提取自定义选项部分。我遇到的挑战是自定义选项部分可能有很多。我想我需要做的是分离出定制选项数组,然后对其进行剖析。我试过logstash解剖,grok,变异,努力把数据拿出来。

2020-12-09_18:06:30.58027 executing local task [refId:3122, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3122, jobTemplateId:3122, jobDate:1607537190133, userId:1897, customConfig:{"AnsibleRequestedUser":"testing1","AnsibleRequestedUserPassword":"VMware321!"}, jobTemplateExecutionId:5677, customInputs:[customOptions:[AnsibleRequestedUser:testing1, AnsibleRequestedUserPassword:VMware321!]], processConfig:[accountId:947, status:executing, username:user1, userId:1897, userDisplayName:user1 user1, refType:jobTemplate, refId:3122, timerCategory:TEST: 0.  Enterprise Create User, timerSubCategory:3122, description: Enterprise Create User], processMap:[success:true, refType:jobTemplate, refId:3122, subType:null, subId:null, process: : 25172, timerCategory:TEST: 0. OpenManage Enterprise Create User, timerSubCategory:3122, zoneId:null, processId:25172], taskConfig:[:],:@45eb737f]

2020-12-09_15:33:43.21913 executing local task [refId:3117, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3117, jobTemplateId:3117, jobDate:1607528023018, userId:320, customConfig:null, jobTemplateExecutionId:5667, customInputs:[customOptions:[AnsibleIdentPoolDesc:asdf123, AnsibleIdentPoolCount:50, TrackingUseCase:Customer Demo/Training, AnsiblePoolName:asdf123]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3117, timerCategory:TEST: 2.  Enterprise - Create Identity Pool, timerSubCategory:3117, description:TEST: 2. Enterprise - Create Identity Pool], processMap:[success:true, refType:jobTemplate, refId:3117, subType:null, subId:null, process: : 25147, timerCategory:TEST: 2. Enterprise - Create Identity Pool, timerSubCategory:3117, zoneId:null, processId:25147], taskConfig:[:], :@21ff5f47]

2020-12-09_15:30:53.83030 executing local task [refId:3112, lockTimeout:330000, lockTtl:300000, jobType:jobTemplateExecute, lockId:job.execute.3112, jobTemplateId:3112, jobDate:1607527853230, userId:320, customConfig:null, jobTemplateExecutionId:5662, customInputs:[customOptions:[ReferenceServer:10629, ReferenceServerTemplateName:asdfasdf, TrackingUseCase:Internal Testing/Training, ReferenceServerTemplateDescription:asdfasdf]], processConfig:[accountId:2, status:executing, username:user@company.com, userId:320, userDisplayName:user, refType:jobTemplate, refId:3112, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, description:TEST: 1. Enterprise - Create Template From Reference Device], processMap:[success:true, refType:jobTemplate, refId:3112, subType:null, subId:null, process: : 25142, timerCategory:TEST: 1. Enterprise - Create Template From Reference Device, timerSubCategory:3112, zoneId:null, processId:25142], taskConfig:[:],:@29ac1e41]

数据需要从上面的消息中获取以下内容。
信息1:
[自定义选项:[ansiblerequ]esteduser:testing1,ansiblerequesteduse公司rpassword:vmware321!]] 我希望这些是在一个新的领域。username:user1 need 把它放在田野里。时间rcategory:test: 0. 企业创建用户需要在一个字段中有这个。
其余的数据可以保留在最初的消息字段中。
信息2:
[自定义选项:[ansibleiden]tpooldesc:asdf123,AnsibleIndent公司poolcount:50,轨道ngusecase:customer demo/training,安西布尔epoolname:asdf123]]-我需要把它们分成不同的区域。username:user@company.com needs 成为一个领域。时间rcategory:test: 2. 企业-创建标识池-我需要一个字段。
信息3:
[自定义选项:[参考nceserver:10629,引用服务器项platename:asdfasdf,轨道ngusecase:internal testing/training,引用服务器模板数据scription:asdfasdf]],-我需要把这些分开。username:user@company.com
必须是一个领域。时间rcategory:test: 1. 企业-从参考设备创建模板-需要是一个字段。
现在请记住,计时器类别将根据日志输出的内容不断变化,但应保持与上面相同的格式。
自定义选项将不断变化-这意味着取决于什么自动化启动将决定更多的自定义选项,但同样的格式上面应该保持不变。用户名可以是email或generic。
下面是一些日志存储过滤器,我尝试了一些成功,但没有处理日志消息不断变化的性质。


# Testing a new method to get information from the logs.

# if "executing local task" in [message] and "beats" in [tags]{

# dissect {

# mapping => {

# "message" => "%{date} %{?skip1} %{?skip2} %{?skip3} %{?refid} %{?lockTimeout} %{?lockTtl} %{?jobtemplate} %{?jobType} %{?jobTemplateId} %{?jobDate} %{?userId} %{?jobTemplateExecutionId} %{?jobTemplateExecutionId1} customInputs:[customOptions:[%{?RequestedPassword}:%{?RequestedPassword} %{?TrackingUseCase1}:%{TrackingUseCase}, %{?RequestedUser}, %{?processConfig}, %{?status}, username:%{username}, %{?userId}, %{?userDisplayName}, %{?refType}, %{?refID}, %{?timerCategory}:%{TaskName}, %{?timeCat}, %{?description}, %{?extra}"

# }

# }

# }

# Testing Grok Filters instead.

if "executing local task" in [messages] and "beats" in [tags]{
    grok {
        match => { "message" => "%{YEAR:year}-%{MONTHNUM2:month}-%{MONTHDAY:day}_%{TIME:time}%{SPACE}%{CISCO_REASON}%{SYSLOG5424PRINTASCII}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{PROG}%{SPACE}%{SYSLOGPROG}%{SYSLOG5424SD:testing3}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing2}%{NOTSPACE}%{SPACE}%{PROG}%{SYSLOG5424SD:testing}%{GREEDYDATA}}"
        }
    }   
}

我认为grok是我需要使用的,但不熟悉如何拆分/添加字段以满足上述需求。
任何帮助都将不胜感激。

jk9hmnmh

jk9hmnmh1#

我建议不要尝试在一个单一的过滤器,尤其是一个单一的格罗克模式做任何事情。我会先用dissect去掉时间戳。我将它保存在[@metadata]字段中,这样就可以在logstash管道中访问它,但不会被输出处理。

dissect { mapping => { "message" => "%{[@metadata][timestamp]} %{} [%{[@metadata][restOfline]}" } }
    date { match => [ "[@metadata][timestamp]", "YYYY-MM-dd_HH:mm:ss.SSSSS" ] }

接下来我将使用grok模式分解restofline。如果您只需要processconfig中的字段,那么这就是您唯一需要的grok模式。我提供了其他示例,说明如何从一条消息中提取多个模式。

grok {
        break_on_match => false
        match => {
            "[@metadata][restOfline]" => [
                "customOptions:\[(?<[@metadata][customOptions]>[^\]]+)",
                "processConfig:\[(?<[@metadata][processConfig]>[^\]]+)",
                "processMap:\[(?<[@metadata][processMap]>[^\]]+)"
            ]
        }
    }

现在我们可以解析[@metadata][processconfig],这是一个键/值字符串。同样,我们将解析后的值保存在[@metadata]中,并复制所需的值。

kv {
        source => "[@metadata][processConfig]"
        target => "[@metadata][processConfigValues]"
        field_split_pattern => ", "
        value_split => ":"
        add_field => {
            "username" => "%{[@metadata][processConfigValues][username]}"
            "timeCategory" => "%{[@metadata][processConfigValues][timerCategory]}"
         }
    }

这将导致具有如下字段的事件

"username" => "user@company.com",
"timeCategory" => "TEST: 2.  Enterprise - Create Identity Pool"
inkz8wg9

inkz8wg92#

这是另一个关于格罗克的回应(但我同意这在当时是有点难以维持的,在现在也很难理解)。
用正确的(有点长的)grok表达式提取customoptions字段
仅使用另一个筛选器(键值)处理此特定字段,例如,将其放入customoptionsplitter字段(以避免打断现有字段)。
此代码是以下内容的实现:

filter{

    grok {
        match => { "message" => "%{DATE:date}_%{TIME:time} %{CISCO_REASON} \[refId\:%{INT:refId}, lockTimeout:%{INT:lockTimeout}, lockTtl:%{INT:lockTtl}, jobType:%{NOTSPACE:jobType}, lockId:%{NOTSPACE:lockId}, jobTemplateId:%{INT:jobTemplateId}, jobDate:%{INT:jobDate}, userId:%{INT:userId}, customConfig:(\{%{GREEDYDATA:customConfig}\}|null), jobTemplateExecutionId:%{INT:jobTemplateExecutionId}, customInputs:\[customOptions:\[%{GREEDYDATA:customOptions}\]\], processConfig:\[%{GREEDYDATA:processConfig}\], processMap:\[%{GREEDYDATA:processMap}\], taskConfig:\[%{GREEDYDATA:taskConfig}\], :%{NOTSPACE:serial}\]"
        }
    }

    kv {
        source => "customOptions"
        target => "customOptionsSplitter"
        field_split_pattern => ", "
        value_split => ":"
    }

}

相关问题