最终目标:连接elasticsearch和kafka,并接收es索引中的持续更改事件到kafka。从Kafka,我有听众做进一步的处理。
方法:我使用logstash输入和输出插件。这是配置代码。
input {
elasticsearch {
hosts => ["localhost:9200"]
index => "liferay-20116"
}
}
output {
kafka {
topic_id => "elastic-topic-index"
codec => json
}
}
这是工作,但有一个奇怪的问题。
当我听Kafka的时候,它读取了所有来自es的文档,现在大约有176个文档。
一旦它读了,它会停下来一段时间,比如说2秒钟,然后再次读取整个176个文档!
我不知道是什么问题,这是因为罗格藏行为还是Kafka行为怪异?
任何帮助都将不胜感激!
2条答案
按热度按时间8iwquhpp1#
我没用logstash就搞清楚了。
我正在使用elasticsearch插件,它会在每次更改时作为web套接字发出事件。
参考文献:https://codeforgeek.com/2017/10/elasticsearch-change-feed/
azpvetkf2#
这是这个插件的标准行为-它将数据匹配推送到给定的查询。如果您只想更改文档,唯一的解决方法是建立您自己更改内容的知识—例如,您需要为条目设置时间戳,然后将此知识合并到发送给es的查询中。