我收到的事件最终在Kafka。从这些事件中,我使用kafka streams应用程序获取id,并将其作为另一个主题中的一对(id,1)发布回kafka。然后我想看看这个id是否已经存在于elasticsearch中,如果存在的话,更新它的计数器,否则在elasticsearch中创建一个新的记录,id从kafka和counter设置为1,即记录(id,1)的上游插入到es。
我希望使用Kafka连接到ElasticSearch这个,但似乎不是那么简单,如果可能的话。我可以看到向es添加记录是可行的,但是与现有记录合并似乎是我还没有发现的事情。这已经有可能了吗?如果有,如何实现?如果没有,计划在附近的版本中实现吗?
1条答案
按热度按时间arknldoa1#
我分叉datamountaineer es sink连接器以允许upsert。使用它,您可以指定一个pk,并在es中运行docasupsert更新。您可以从我的github fork获取项目并编译jar。