如何在Logstash HTTP输出中Map消息内的数组

ctzwtxfj  于 2023-03-06  发布在  Logstash
关注(0)|答案(1)|浏览(170)

我正在使用Logstash通过查询现有Elasticsearch文档来更新,其中包含从Potgresql表中提取的聚合值的附加字段。我使用elastichsearch输出加载一个索引,使用document_id和http输出更新另一个具有不同document_id但收到错误的索引:
[2023-02- 08 T17:58:12,086][错误][日志存储.输出.http ][主要][b64 f19821 b11 ee 0 df 1bd 165920785876 cd 6c 5 fab 079 e27 d39 bb 7 ee 19 a3 d 642 a4] [HTTP输出失败]遇到非2xx HTTP代码400 {:响应代码=〉400,:url=〉“http://localhost:9200/medico/_update_by_query”,:事件=〉#日志存储::事件:0x 19 a14 c 08}
这是我的管道配置:

input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "jdbc:postgresql://handel:5432/mydb"
        statement_filepath => "D:\ProgrammiUnsupported\logstash-7.15.2\config\nota_sede.sql"
    }
}

filter {
    aggregate {
        task_id => "%{idCso}"
        code => "
            map['idCso'] = event.get('idCso')
            map['noteSede'] ||= []
            map['noteSede'] << {
                'id' => event.get('idNota'),
                'tipo' => event.get('tipoNota'),
                'descrizione' => event.get('descrizione'),
                'data' => event.get('data'),
                'dataInizio' => event.get('dataInizio'),
                'dataFine' => event.get('dataFine')
            }
            event.cancel()"
        push_previous_map_as_event => true
        timeout => 60
        timeout_tags => ['_aggregatetimeout']       
    }
   }
}

output {

    stdout { codec => rubydebug { metadata => true } }

#       this works
    elasticsearch {
        hosts => "https://localhost:9200"
        document_id => "STRUTTURA_%{idCso}" 
        index => "struttura"
        action => "update"
        user => "user"
        password => "password"
        ssl => true
        cacert => "/usr/share/logstash/config/ca.crt"   
    }
    
    http {
        url => "http://localhost:9200/medico/_update_by_query"
        user => "elastic"
        password => "changeme"
        http_method => "post"
        format => "message"
        content_type => "application/json"
        message => '{
                        "query":{
                            "term":{
                                "idCso":"%{idCso}"
                            }
                        },
                        "script":{
                            "source":"ctx._source.noteSede=params.noteSede",
                            "lang":"painless",
                            "params":{
                                "noteSede":"%{noteSede}"
                                }
                            }
                        }
                    }'
    }
}

stdout输出显示了发送到输出的文档,如下所示:

{
     "query" => {
        "term" => {
            "idCso" => "859119"
        }
    },
    "script" => {
        "source" => "ctx._source.noteSede=params.noteSede",
        "lang" => "painless",
        "params" => {
            "noteSede" => "{dataFine=null, dataInizio=2020-02-13, descrizione=?, tipo=DB, id=6390644, data=2020-02-13 12:26:58.409},{dataFine=null, dataInizio=2020-02-13, descrizione=?, tipo=DE, id=6390645, data=2020-02-13 12:26:58.41}"
        }
        }
    }
}

如何将消息中的noteSede数组字段设置为_update_by_query?

dsf9zpds

dsf9zpds1#

我发现了使用ruby代码设置参数数组和设置http输出格式为json的技巧。可能是代码优化,但它工作!

ruby {
    code => '
        temp = event.get("noteSede")
        note = {"noteSede" => temp}
        event.set("script", "params" => note)
    '
}
mutate {
    add_field => {
        "[script][lang]" => "painless"
        "[query][term][idSede]" => '%{idSede}'
        "[script][source]" => "ctx._source.noteSede = params.noteSede"
    }
    remove_field => ["tags", "idSede", "noteSede", "@version", "@timestamp"]            
}

再见

相关问题