Logstash,年度索引,在创建之前获取错误查询

zd287kbt  于 2023-11-15  发布在  Logstash
关注(0)|答案(1)|浏览(253)

好的,我们有一个logstash管道,我们每小时查询一个http轮询器来获取上周的数据。
在我们将数据保存到ES索引之前,我们过滤了以前在索引上的数据,丢弃了那些事件。
最后,我们存储新事件。
现在我们希望管道每年创建一个新的索引,这样我们就可以有序地管理数据,所以我们在输出中添加了一个action =>“create”和一个模板,这样如果ii不存在,索引就会被创建。
这是我们的管道:

input {
http_poller {
    urls => {
        air_quality => {
            url => "https://url_to_data"
            method => get
            headers => {
                Accept => "application/csv"
            }
        }
    }
    schedule => { every => "1h" }
    codec => line { charset => "UTF-8" } 
}

字符串
}

filter {
#If event starts by Station it's first line and we ignore it
if [message] =~ /^Station/ {
    drop { }
} 
#We tell fieldnames to get from CSV.
csv {
    skip_header => "true"
    columns => ["station", "title", "latitude", "longitude", "date", "period", "SO2", "NO", "NO2", "CO", "PM10", "O3", "dd", "vv", "TMP", "HR", "PRB", "RS", "LL", "BEN", "TOL", "MXIL", "PM25"]
    separator => ","
}

#We assign format and timezone to date
date {
    match => ["[date]", "yyyy-MM-dd"]
    timezone => "Europe/Madrid"
    target => "[date]"  
}

#We scape ":" from timezone 
mutate { add_field => { "eventDate" => "%{[fecha]}" } }

#And add the scaped character
mutate { gsub => [ "eventDate", ":", "\:" ] } 

#We check if there is a repeated event on the index, so we drop it.
elasticsearch {
    hosts => "https://our_host_url"
    ca_file => "our_http_ca.crt"
    api_key => "our_API_KEY"
    index => "our-base-inex-name-%{+YYYY}"
    query => "period:%{[period]} AND station:%{[station]} AND date:%{[eventDate]}"
}

#Drop repeated events
if [@metadata][total_hits] != 0 {
    drop {}
}

#Rename latitude and longitude 
mutate {
    rename => {
        "latitude" => "[situation][lat]"
        "longitude" => "[situation][lon]"
    }
}


}

output {
elasticsearch {
    hosts => "https://our_host_url"
    cacert => "our_http_ca.crt"
    api_key => "our_API_KEY"
    index => "our-base-index-name-%{+YYYY}"
    action => "create"
    template_name => "out-base-template-name"
}


}
所以现在我们收到404警告,我们的输出查询试图检查尚未创建的新索引中的先前数据。
我认为我们应该能够通过模板创建索引,然后在过滤器上检查查询,但不知道如何做到这一点。
然而,令我惊讶的是,日志不显示错误,它显示WARNINGS,如果我在过滤器中注解查询部分,系统工作正常并创建索引,如果我再次激活它,对不存在的索引的查询失败(带WARNINGS),索引永远不会创建。我怀疑这是因为WARNINGS避免了所有事件被触发,并且由于没有事件被触发,输出永远不会到达。
对此有什么建议吗?

tvmytwxo

tvmytwxo1#

_search API中有一个不返回错误的选项,叫做ignore_unavailable。默认值是false,这意味着如果查询的索引不存在,请求将返回错误。但是,elasticsearch过滤器插件没有提供任何方法将此参数设置为true。
由于另一个名为allow_no_indices的参数默认为true,因此不必查询确切的索引名称,技巧是在名称中添加一个“”,如our-base-index-name-%{+YYYY}*,这样即使索引不存在也不会出错。或者只是our-base-index-name-*,因为您的查询已经确定了数据的正确日期范围。

相关问题