elasticsearch在单个字段中聚合一些值

kqlmhetl  于 2021-06-10  发布在  ElasticSearch
关注(0)|答案(1)|浏览(382)

我有一些原始数据

{
  {
    "id":1,
    "message":"intercept_log,UDP,0.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":2,
    "message":"intercept_log,TCP,172.22.96.4,52085,239.255.255.250,3702,1:"
  },
  {
    "id":3,
    "message":"intercept_log,UDP,1.0.0.0,68,255.255.255.255,67"
  },
  {
    "id":4,
    "message":"intercept_log,TCP,173.22.96.4,52085,239.255.255.250,3702,1:"
  }
}

需求

我想根据消息的消息部分的值对数据进行分组。这样的产值

{
  {
    "GroupValue":"TCP",
    "DocCount":"2"
  },
  {
    "GroupValue":"UDP",
    "DocCount":"2"
  }
}

试试看

我试过这些代码,但失败了

GET systemevent*/_search
{
  "size": 0, 
  "aggs": {
    "tags": {
      "terms": {
        "field": "message.keyword",
        "include": " intercept_log[,,](.*?)[,,].*?"
      }
    }
  },
  "track_total_hits": true
}

现在我尝试使用管道来满足这个需求。
“aggs”似乎只对字段进行分组。
有人有更好的主意吗?

链接

术语聚合

更新

我的场景有点特别。我从许多不同的服务器收集日志,然后将日志导入es。因此,消息字段之间有很大的区别。如果直接使用脚本语句进行分组统计,将导致分组失败或分组不准确。我尝试根据条件筛选出一些数据,然后使用脚本对操作代码(注解代码1)进行分组,但此代码无法对正确的结果进行分组。
这是我要添加的场景:
我们的团队使用es分析服务器日志,使用rsyslog将数据转发到服务器中心,然后使用logstash将数据过滤并提取到es。此时,es中有一个名为message的字段,message的值是详细的日志信息。此时,我们需要计算消息中包含一些值的数据。

注解代码1

POST systemevent*/_search
{
  "size": 0, 
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "message": {
              "query": "intercept_log"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A'",
        "size": 10
      }
    }
  },
  "track_total_hits": true
}

注解代码2

POST test2/_search
{
  "size": 0,
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def values = /.*,.*/.matcher( doc['host.keyword'].value ); if( name.matches() ) {return values.group(1) } else { return 'N/A' }",
        "size": 10
      }
    }
  }
}
qvtsj1bj

qvtsj1bj1#

解决这个问题的最简单方法是利用 terms 聚合。脚本只需在逗号上拆分,然后取第二个值。

POST systemevent*/_search
    {
      "size": 0,
      "aggs": {
        "protocol": {
          "terms": {
            "script": "def values = /,/.split(doc['message.keyword'].value); return values.length > 1 ? values[1] : 'N/A';",
            "size": 10
          }
        }
      }
    }

使用正则表达式

POST test2/_search
{
  "size": 0,
  "aggs": {
    "protocol": {
      "terms": {
        "script": "def m = /.*proto='(.*?)'./.matcher(doc['message.keyword'].value ); if( m.matches() ) { return m.group(1) } else { return 'N/A' }"
      }
    }
  }
}

结果看起来

"buckets" : [
    {
      "key" : "TCP",
      "doc_count" : 2
    },
    {
      "key" : "UDP",
      "doc_count" : 2
    }
  ]


一个更好、更有效的方法是将 message 使用摄取管道或日志存储将字段转换为新字段。

相关问题