我已经读过官方文件了,没办法。
我给es的数据来自Kafka,有时会出问题。过去,kafka的消息是解析的,直接插入或更新特定id的es doc,为了避免旧的数据覆盖新的数据,我需要检查特定id的doc是否已经存在,这个doc的一些属性是否满足条件。然后我执行更新操作(或插入)。
我现在做的是“更新前搜索”。
在更新文档之前,我使用特定的id(包括在kafka msg中)从es中搜索。然后检查此文档是否满足条件(例如,更新时间是否较旧?)。最后我更新了文档。然后我开始 refresh
至 true
立即更新索引。
我担心什么?
这似乎是交易。
如果只有一个线程同步执行,那么在我处理下一条消息时,上一条消息进程中更新的文档是否有可能不在es刷新?
如果我有几个线程消费Kafka消息,如何在更新前检查?我可以用脚本来解决这个问题吗?
2条答案
按热度按时间bgibtngc1#
如果只有一个线程同步执行,那么在我处理下一条消息时,上一条消息进程中更新的文档是否有可能不在es刷新?
公元1年。可以在elasticsearch中保存数据,并在收到过时的结果后不久(在更新索引之前)保存数据
如果我有几个线程消费Kafka消息,如何在更新前检查?我可以用脚本来解决这个问题吗?
公元2年。如果您在多个线程中处理kafka消息,最好使用业务数据(例如某些业务id)作为kafka中的分区键,以确保按顺序处理数据。请记住使用kafka在多个线程中使用消息,不要使用单个使用者的消息,以便稍后分散到多个线程。
似乎最好是确保数据按顺序处理,然后在elasticsearch中放弃检查,因为它不能保证提供有效的结果。
s8vozzvw2#
如果只有一个线程同步执行,那么在我处理下一条消息时,上一条消息进程中更新的文档是否有可能不在es刷新?
这是一种可能性,因为索引每秒钟刷新一次(默认情况下),减少此值既不建议也不能保证提供所需的结果,因为elasticsearch不是为此而设计的。
如果我有几个线程消费Kafka消息,如何在更新前检查?我可以用脚本来解决这个问题吗?
如果要更新的字段数量非常有限,则可以使用脚本。就我个人而言,我发现脚本最适合于单字段更新,而且对于角落用例,它也不应该被用作一般实践。如果超过这一点,您将面临与rdbms世界中存储过程相同的风险。它使得数据管理总体上不稳定,而且从长远来看,系统更难维护/扩展。
您的用例最适合elasticsearch提供的乐观锁定支持。请查看elasticsearch版本控制支持以获取完整的详细信息。
如果并发性是您需要解决的唯一问题,那么您可以很好地使用内置的doc版本。但是,如果您需要的不仅仅是并发性(无序的消息传递和相应的es更新),那么您应该使用应用程序/域特定的字段,因为内置版本无法正常工作。
您可以很好地使用任何特定于应用程序的(数字)字段作为版本字段,并在文档更新期间使用它进行乐观锁定。如果您使用这种方法,请特别注意该索引的所有插入、更新、删除操作。引用来自版本控制支持的原样-使用外部版本控制时,请确保始终将当前版本(和版本类型)添加到任何索引、更新或删除调用中。如果您忘记了,elasticsearch将使用其内部系统来处理该请求,这将导致版本错误地递增
我建议您首先评估内置版本,如果它满足您的需要,就使用它。这会使整个设计简单得多。如果内置版本不符合您的要求,请考虑将特定于应用程序的版本作为第二个选项。