Elasticsearch以非顺序的顺序处理来自Logstash的传入事件/消息

u4dcyp6a  于 2023-11-15  发布在  Logstash
关注(0)|答案(1)|浏览(244)

我们有一个系统,可以将数据从MongoDB同步到Elasticsearch。以下是关键组件:

  1. MongoDB源连接器(Kafka connect connector):该组件从MongoDB操作日志中读取事件并生成关于Kafka主题的消息。
  2. Logstash:Logstash从Kafka接收这些消息并将其发送给Elasticsearch。我们已经为Logstash配置了特定的设置,包括pipeline.workers:1和pipeline.ordered:true,以确保事件按照接收的顺序处理。
  3. ElasticSearch:1个节点集群,索引只有一个主分片。

使用的ELK协议栈版本8.7.1
**问题:**我们有一个用例,在mongoDB中以顺序方式执行以下操作:

1.创建文档A
1.更新文件A
1.删除文件A
虽然我们可以看到这些操作在Logstash中按顺序处理,但我们遇到了一个问题:文档A的删除没有反映在Elasticsearch中。Elasticsearch中文档上的_version是3,表明所有3个事件都被执行。这表明删除操作可能在更新操作之前处理,导致文档A在MongoDB中被删除后仍保留在Elasticsearch中。
logstash管道如下:

input {
    kafka {
        id => "my_plugin_id"
        group_id => "logstash"
        bootstrap_servers => "broker:29092"
        topics => ["topic"]
        auto_offset_reset => "earliest"
        consumer_threads => 1
    }
}

filter {
    json {
        source => "message"
        target => "message"
        add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
    }
}

output {
    if [message][operationType] == "delete" {
        elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "delete"
        }
    }
    else {
        elasticsearch {
        hosts => "http://es01:9200"
        index => "index_name"
        document_id => "%{[mongoId]}"
        user => "elastic"
        password => "changeme"
        pipeline => "index_pipeline"
        }
    }
}

字符串

备注:正如上面的配置中所提到的,对于 delete 以外的其他操作,我们使用一个摄取管道来重构要索引的文档数据。document_id被设置为mongold。

一个假设,尽管有待验证:据我所知,Elasticsearch输出插件(用于logstash管道)使用批量API将数据发送到ElasticSearch。事件是否可能在单个批处理中处理的(子请求)不遵守严格的顺序,即删除可能正在运行在更新事件之前(或完成),因此,ElasticSearch中最终可见的文档对应于更新操作。刷新索引设置为默认值,即1次/秒。此外,我增加了pipeline.batch.delay(logstash pipeline config)增加到5000 ms(默认为50 ms),以确保所有事件都在同一批中。

gcmastyq

gcmastyq1#

将pipeline. worker设置为1并将ordered设置为true,可以确保Logstash管道逐个处理文档。事件也会逐个到达输出层。
但是,由于您有两个不同的elasticsearch输出插件,它们都独立运行,并且每个插件都可以在任何任意时刻发送其批处理,这取决于缓冲区填充了多少或距离上次发送已有多长时间。
因此,在您的情况下可能会发生的情况是,首先发送删除批处理,并且不执行任何操作,因为没有具有该ID的文档,然后发送创建/更新批处理,有效地创建文档。
你可能应该做的是在filter部分中将临时action字段(例如[@metadata][action])的值设置为indexupdatedelete,并在单个elasticsearch输出中使用该值,如下所示:

output {
    elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "%{[@metadata][action]}"
        pipeline => "index_pipeline"
    }
}

字符串
您的管道将以批量级别发送,因此它不会对删除操作产生任何影响,因此应该不会有任何问题。如果有问题,则只需从输出中删除pipeline设置,并直接在索引设置中将其设置为index.default_pipeline属性。
试试看

相关问题