我们有一个系统,可以将数据从MongoDB同步到Elasticsearch。以下是关键组件:
- MongoDB源连接器(Kafka connect connector):该组件从MongoDB操作日志中读取事件并生成关于Kafka主题的消息。
- Logstash:Logstash从Kafka接收这些消息并将其发送给Elasticsearch。我们已经为Logstash配置了特定的设置,包括pipeline.workers:1和pipeline.ordered:true,以确保事件按照接收的顺序处理。
- ElasticSearch:1个节点集群,索引只有一个主分片。
使用的ELK协议栈版本:8.7.1
**问题:**我们有一个用例,在mongoDB中以顺序方式执行以下操作:
1.创建文档A
1.更新文件A
1.删除文件A
虽然我们可以看到这些操作在Logstash中按顺序处理,但我们遇到了一个问题:文档A的删除没有反映在Elasticsearch中。Elasticsearch中文档上的_version
是3,表明所有3个事件都被执行。这表明删除操作可能在更新操作之前处理,导致文档A在MongoDB中被删除后仍保留在Elasticsearch中。
logstash管道如下:
input {
kafka {
id => "my_plugin_id"
group_id => "logstash"
bootstrap_servers => "broker:29092"
topics => ["topic"]
auto_offset_reset => "earliest"
consumer_threads => 1
}
}
filter {
json {
source => "message"
target => "message"
add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
}
}
output {
if [message][operationType] == "delete" {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "delete"
}
}
else {
elasticsearch {
hosts => "http://es01:9200"
index => "index_name"
document_id => "%{[mongoId]}"
user => "elastic"
password => "changeme"
pipeline => "index_pipeline"
}
}
}
字符串
备注:正如上面的配置中所提到的,对于 delete 以外的其他操作,我们使用一个摄取管道来重构要索引的文档数据。document_id
被设置为mongold。
一个假设,尽管有待验证:据我所知,Elasticsearch输出插件(用于logstash管道)使用批量API将数据发送到ElasticSearch。事件是否可能在单个批处理中处理的(子请求)不遵守严格的顺序,即删除可能正在运行在更新事件之前(或完成),因此,ElasticSearch中最终可见的文档对应于更新操作。刷新索引设置为默认值,即1次/秒。此外,我增加了pipeline.batch.delay
(logstash pipeline config)增加到5000 ms(默认为50 ms),以确保所有事件都在同一批中。
1条答案
按热度按时间gcmastyq1#
将pipeline. worker设置为1并将ordered设置为true,可以确保Logstash管道逐个处理文档。事件也会逐个到达输出层。
但是,由于您有两个不同的
elasticsearch
输出插件,它们都独立运行,并且每个插件都可以在任何任意时刻发送其批处理,这取决于缓冲区填充了多少或距离上次发送已有多长时间。因此,在您的情况下可能会发生的情况是,首先发送删除批处理,并且不执行任何操作,因为没有具有该ID的文档,然后发送创建/更新批处理,有效地创建文档。
你可能应该做的是在filter部分中将临时action字段(例如
[@metadata][action]
)的值设置为index
,update
或delete
,并在单个elasticsearch
输出中使用该值,如下所示:字符串
您的管道将以批量级别发送,因此它不会对删除操作产生任何影响,因此应该不会有任何问题。如果有问题,则只需从输出中删除
pipeline
设置,并直接在索引设置中将其设置为index.default_pipeline
属性。试试看