logstash 使用ELK Stack + Kafka的Nginx日志管道

f4t66c6m  于 2022-12-09  发布在  Logstash
关注(0)|答案(1)|浏览(239)

Hi I am trying to send nginx logs which is in json format via filebeat into kafka then into logstash and then ES and then visualize it using Kibana.
MY nginx log format , which is in json is like below:

'{"@timestamp":"$time_iso8601","host":"$hostname",'
                            '"server_ip":"$server_addr","client_ip":"$remote_addr",'
                            '"xff":"$http_x_forwarded_for","domain":"$host",'
                            '"url":"$uri","referer":"$http_referer",'
                            '"args":"$args","upstreamtime":"$upstream_response_time",'
                            '"responsetime":"$request_time","request_method":"$request_method",'
                            '"status":"$status","size":"$body_bytes_sent",'
                            '"request_body":"$request_body","request_length":"$request_length",'
                            '"protocol":"$server_protocol","upstreamhost":"$upstream_addr",'
                            '"file_dir":"$request_filename","http_user_agent":"$http_user_agent"'
                            '}'

my LOgstash conf file

input {
  kafka {
    bootstrap_servers => "10.10.10.240:9092"
    topics => ["payments-nginx"]
    codec => json
  }
}
filter {
geoip {
      source => "client_ip"
      remove_field => [ "[geoip][location][lon]", "[geoip][location][lat]" ]
    }
useragent {
    source => "http_user_agent"
    target => "ua"
    remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][os_name]" ]
  }
mutate {
        remove_field => [ "xff", "args", "request_body","http_user_agent" ]
      }
}
output {
  elasticsearch {
    hosts => ["localhost:9202"]
    manage_template => false
    index => "payments-nginx-%{+YYYY.MM.dd}"
  }
#}
#stdout { codec => rubydebug }
}

Problem: i am not able to see fields like status,response time,client_ip in kibana, it is showing in message but fields are not visible seperately, as logstash is not able to parse client_ip field , my geographic location is also failing, logtash logs are not showing error but still is is not able to parse, any solution or reason for this
enter image description here
Filebeat verison 7.10.1 logstash version 7.10.1 es version 7.10.0
I have enabled debug logs for logstash

on filter {:event=>#<LogStash::Event:0x23a9bc8d>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Node 0 sent an incremental fetch response for session 636759397 with 0 response partition(s), 2 implied partition(s)
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-1 at position FetchPosition{offset=26718738, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-0 at position FetchPosition{offset=26715192, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=636759397, epoch=5580) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 2 partition(s)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(payments-nginx-1, payments-nginx-0)) to broker mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,875][DEBUG][org.apache.kafka.clients.NetworkClient][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v10 to send FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=636759397,session_epoch=5580,topics=[],forgotten_topics_data=[]} with correlation id 5963 to node 0
Error parsing json {:source=>"message", :raw=>{"event"=>{"module"=>"nginx", "dataset"=>"nginx.access", "timezone"=>"+05:30"}, "log"=>{"offset"=>17947386523, "file"=>{"path"=>"/var/log/nginx/new_json.log"}}, "host"=>{"name"=>"mbkapp57", "os"=>{"version"=>"7 (Core)", "name"=>"CentOS Linux", "platform"=>"centos", "family"=>"redhat", "kernel"=>"3.10.0-327.13.1.el7.x86_64", "codename"=>"Core"}, "architecture"=>"x86_64", "containerized"=>false, "ip"=>["10.10.10.82"], "id"=>"92278feb4ab04110b9b72833eb1bf548", "mac"=>["22:77:85:f2:da:96", "d2:ea:9f:33:51:5c", "aa:d2:1d:af:62:49", "ba:c5:e6:af:17:ce"], "hostname"=>"mbkapp57"}, "message"=>"{\"@timestamp\":\"2021-01-20T00:05:31+05:30\",\"host\":\"mbkapp57\",\"server_ip\":\"10.10.10.82\",\"client_ip\":\"45.119.57.164\",\"xff\":\"-\",\"domain\":\"appapi.mobikwik.com\",\"url\":\"/p/upgradewallet/v3/kycConsent\",\"referer\":\"-\",\"args\":\"-\",\"upstreamtime\":\"0.026\",\"responsetime\":\"0.026\",\"request_method\":\"GET\",\"status\":\"200\",\"size\":\"129\",\"request_body\":\"-\",\"request_length\":\"425\",\"protocol\":\"HTTP/1.1\",\"upstreamhost\":\"10.10.10.159:8080\",\"file_dir\":\"/usr/share/nginx/html/p/upgradewallet/v3/kycConsent\",\"http_user_agent\":\"Mobikwik/22 CFNetwork/1209 Darwin/20.2.0\"}", "@timestamp"=>"2021-01-19T18:35:31.410Z", "agent"=>{"version"=>"7.10.1", "ephemeral_id"=>"2d565807-b19c-40c2-a092-e66ee5dd4f9b", "name"=>"mbkapp57", "type"=>"filebeat", "hostname"=>"mbkapp57", "id"=>"c9d8d92d-31be-4996-99d8-aae6680c1570"}, "fileset"=>{"name"=>"access"}, "ecs"=>{"version"=>"1.5.0"}, "@metadata"=>{"beat"=>"filebeat", "version"=>"7.10.1", "pipeline"=>"filebeat-7.10.1-nginx-access-pipeline", "type"=>"_doc"}, "input"=>{"type"=>"log"}, "service"=>{"type"=>"nginx"}}, :exception=>java.lang.ClassCastException}
[root@es02 logstash]#
9rnv2umw

9rnv2umw1#

我通过将Nginx日志模式更改为以下格式来改变我的方法:

log_format custom  '$remote_addr [$time_local] '
                           '$request $status $body_bytes_sent '
                           '$http_referer $http_user_agent $host $upstream_addr $request_time';

此外,logstash配置解析来自Kafka的nginx日志,并通过应用grok模式将其转换为json格式Logstash conf文件:

kafka {
    bootstrap_servers => "10.X.X.X:9092"
    topics => ["payments"]
  }
}
filter {
   grok {
      match => { "message" => "%{IPORHOST:remote_ip} \[%{HTTPDATE:time}\] %{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version} %{NUMBER:response_code} %{NUMBER:body_sent_bytes} %{DATA:referrer} %{DATA:agent} %{IPORHOST:domain} %{NOTSPACE:upstream_address} %{NUMBER:upstream_time}" }
        }
   date {
        match => [ "time", "dd/MMM/YYYY:H:m:s Z" ]
        target => "my_date_as_date"
      }
   geoip {
      source => "remote_ip"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
   mutate {
      add_field => { "host_nginx" => "X.X.X.82" }
      add_field => { "read_time" => "%{@timestamp}" }
      convert => ["body_sent_bytes", "integer"]
      convert => ["upstream_time", "float"]
      remove_field => [ "message", "[geoip][location][lat]", "[geoip][location][lon]", "[geoip][country_code3]" ]
}
}
output {
   elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "paymentapi-nginx-%{+YYYY.MM.dd}"
  }
}

相关问题