使用logstash udp输入和某些过滤器并输出到PostgreSQL数据库时,UDP数据包接收错误持续增加

uyhoqukh  于 2022-12-09  发布在  Logstash
关注(0)|答案(1)|浏览(320)

这里我正在处理来自路由器的大量日志,并将其发送到logstash,logstash通过UDP输入方法接收日志。对它进行一些过滤方法,最后使用JDBC输出将这些字段插入PostgreSQL数据库。下面是logstash管道。

input{
    udp{
        port => 9002
        host => "10.10.10.10"
        queue_size => 25000
        workers => 8
        receive_buffer_bytes => 2119999999
    }
}

filter{
    grok{
        match => {
                        "message" => "%{DATA}%{TIMESTAMP_ISO8601:Local_Time}%{ISO8601_TIMEZONE:NTP} %{HOSTNAME:Bras}%{DATA}: application:%{DATA:Application}, %{DATA} %{IPV4:Src_Nat_IP}:%{INT:Src_Nat_Port} \[%{IPV4:Src_IP}:%{INT:Src_Port}\]%{DATA}%{IPV4:Dst_IP}:%{INT:Dst_Port} \(%{DATA:Protocol}\)"
                }
        match => {
                        "message" => "%{DATA}%{TIMESTAMP_ISO8601:Local_Time}%{ISO8601_TIMEZONE:NTP} %{HOSTNAME:Bras}%{DATA}: application:%{DATA:Application}, %{DATA} %{IPV4:Src_IP}:%{INT:Src_Port}%{DATA}%{IPV4:Dst_IP}:%{INT:Dst_Port} \(%{DATA:Protocol}\)"
                }
    }
    if ! [Src_Nat_IP] or ! [Src_Nat_Port]{
        mutate{
            add_field => ["Where","from outside"]
        }
    }
    else{
        mutate{
            add_field => ["Where","from inside"]
        }
    }
    mutate{
        gsub => [ "Local_Time", "T", " " ]
        remove_field => ["host", "NTP", "log", "event"]
    }
    date{
        match => [ "Local_Time", "yyyy-mm-dd HH:mm:ss" ]
    }
    if [Application] == "none"{
        mutate{
            remove_field => "Application"
        }
    }
}

output{
    jdbc{
        connection_string => 'jdbc:postgresql://127.0.0.1:6432/postgres?user=username&password=password'
        max_pool_size => 999
        statement => [ "INSERT INTO todat_test (local_time, bras, application, src_nat_ip, src_nat_port, src_ip, src_port, dst_ip, dst_port) VALUES (CAST (? AS timestamp), ?, ?, CAST (? AS inet), CAST (? AS integer),CAST (? AS inet), CAST (? AS integer), CAST (? AS inet), CAST (? AS integer))", "Local_Time", "Bras", "Application", "Src_Nat_IP", "Src_Nat_Port", "Src_IP", "Src_Port", "Dst_IP", "Dst_Port"]
    }
}

服务器运行的是Centos 7,有16核CPU和16 GB RAM。我也增加了logstash和操作系统本身的默认缓冲区大小,但是仍然有UDP打包器接收错误。我该如何处理这些日志呢?每小时大约会产生3000万行。我还设置了pgbouncer来池化PostgreSQL连接;然而,这也是行不通的。
我的问题呢?
如何处理系统接收到的所有日志并将所有数据导入数据库?请提出需要优化的地方。

qfe3c7zg

qfe3c7zg1#

如果这些grok模式与%{DATA}的多次出现不匹配,那么开销将非常非常大。它必须从消息中的每个字符开始查找时间戳,然后一旦匹配,就必须检查每个后续字符的IP,依此类推。
打破你的模式。你可以试试

grok {
    break_on_match => false
    match => {
        "message" => [
            "%{TIMESTAMP_ISO8601:Local_Time}%{ISO8601_TIMEZONE:NTP} %{HOSTNAME:Bras}",
            "application:%{DATA:Application},"
        ]
    }
}
grok {
    match => {
        "message" => [
            " %{IPV4:Src_Nat_IP}:%{INT:Src_Nat_Port} \[%{IPV4:Src_IP}:%{INT:Src_Port}\]%{DATA}%{IPV4:Dst_IP}:%{INT:Dst_Port} \(%{DATA:Protocol}\)",
            " %{IPV4:Src_IP}:%{INT:Src_Port}%{DATA}%{IPV4:Dst_IP}:%{INT:Dst_Port} \(%{DATA:Protocol}\)"
        ]
    }
}

如果ip:port(proto)位于行尾,则提取

%{IPV4:Dst_IP}:%{INT:Dst_Port} \(%{DATA:Protocol}\)$

并将其作为附加模式放入第一个grok中。这将删除剩余的DATA字段。

相关问题