aws负载均衡器日志从s3到ElasticSearch

o4tp2gmn  于 2021-06-14  发布在  ElasticSearch
关注(0)|答案(1)|浏览(621)

我已经启用了我的elb日志到s3 bucket。我试图用lambda中的下面脚本将s3日志发送到elasticsearch。
日志以*.log.gz格式存储在s3 bucket中。如何以json格式将zip文件发送到elasticsearch。
我试着推荐https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/es-aws-integrations.html#es-aws-integrations-s3-lambda-es不走运。
如果有更好的方法,请告诉我。

import boto3
import re
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the Amazon ES domain, including https://
index = 'lambda-s3-index'
type = 'lambda-type'
url = host + '/' + index + '/' + type

headers = { "Content-Type": "application/json" }  
s3 = boto3.client('s3')

# Regular expressions used to parse some simple log lines

ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')

message_pattern = re.compile('\"(.+)\"')

# Lambda execution starts here

def handler(event, context):
    for record in event['Records']:

        # Get the bucket name and key for the new file
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # Get, read, and split the file into lines
        obj = s3.get_object(Bucket=bucket, Key=key)
        body = obj['Body'].read()
        lines = body.splitlines()

        # Match the regular expressions to each line and index the JSON
        for line in lines:
            ip = ip_pattern.search(line).group(1)
            timestamp = time_pattern.search(line).group(1)
            message = message_pattern.search(line).group(1)

            document = { "ip": ip, "timestamp": timestamp, "message": message }
            r = requests.post(url, auth=awsauth, json=document, headers=headers)
ddhy6vgd

ddhy6vgd1#

更好的方法是使用安装了s3输入插件的logstash来解析elb访问日志,并将其发送到elasticsearch。
logstash.conf文件:

input { 
  s3 {
    access_key_id => "..."
    secret_access_key => "..."
    bucket => "..."
    region => "eu-central-1"
    prefix => "dxlb/AWSLogs/.../elasticloadbalancing/eu-central-1/2019/09/"
    type   => "elb"
  }
}

filter {
   if [type] == "elb" {
    grok {
         match => [ "message", "%{WORD:connection} %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:elb} %{IP:clientip}:%{INT:clientport:float} (?:(%{IP:backendip}:?:%{INT:backendport:int})|-) %{NUMBER:request_processing_time:float} %{NUMBER:backend_processing_time:float} %{N
UMBER:response_processing_time:float} (?:-|%{INT:elb_status_code:int}) (?:-|%{INT:backend_status_code:int}) %{INT:received_bytes:int} %{INT:sent_bytes:int} \"%{ELB_REQUEST_LINE}\" \"(?:-|%{DATA:user_agent})\" (?:-|%{NOTSPACE:ssl_cipher}) (?:-|%{NOTSPACE:ssl_protocol})
" ]
        #match => ["message", "%{ELB_ACCESS_LOG} \"%{DATA:userAgent}\"( %{NOTSPACE:ssl_cipher} %{NOTSPACE:ssl_protocol})?"]
   }
   date {
      match => [ "timestamp", "ISO8601" ]
    }
     geoip {
      source => "clientip"
    }
  }
}

output {
if [type] == "elb" {
        elasticsearch {
                hosts => ["http://node:9200"]
                index => "logstash-%{+YYYY.MM}" 
                user => "..."
                password => "..."
        }
}
}

相关问题