如何使用Kafka Streams处理顺序错误的事件

d6kp6zgx  于 2023-01-12  发布在  Apache
关注(0)|答案(4)|浏览(157)

我有一个应用程序,其中的事件发送Kafka主题的基础上,用户的行动,如用户登录,用户的中间行动(可选)和用户注销。每个事件都有一些信息,在一个事件对象沿着userId,例如登录事件有loginTime;添加注解有注解(中间操作)。类似地,注销事件有logoutTime。要求是在接收到每个用户的注销事件后,将所有这些事件的信息聚合到一个对象中,并将其发送到下游。
由于某些原因(网络延迟、多个事件生成器),事件可能不按顺序发生(用户注销事件可能在中间事件之前发生),因此问题是如何处理此类情况?我无法在收到用户注销事件后等待中间事件,因为中间事件是可选的,取决于用户的操作。
我认为这里唯一的选择是在收到用户注销事件后等待一段时间,如果在等待时间内收到中间事件,则处理中间事件并发送已处理事件,但再次不确定如何实现这一点。

kx7yvsdv

kx7yvsdv1#

Kafka不保证topic上的顺序,它保证了partition上顺序。一个主题可以有多个分区,因此使用您主题的每个使用者都将使用一个分区。这就是Kafka实现可伸缩性的方法。因此,您所遇到的是正常行为(它不是bug或与网络延迟或类似的东西有关).你能做的是确保你想要按顺序处理的所有消息都被发送到同一个分区。你可以通过设置分区数为1来实现,这是最愚蠢的方法。当你用生产者发送消息时,默认情况下Kafka会查看密钥,获取密钥的哈希值,然后通过哈希值知道应该在哪个分区发送消息。你可以确保对于所有消息,键是相同的。2这样所有的键的散列将是相同的,并且所有的消息将去到相同的分区。3同样,你可以实现自定义的分区程序,并覆盖默认的方式如何Kafka选择哪个分区消息将去。4在这种方式下,所有的消息将按顺序到达。5如果你不能做任何这样的动作,然后你会无序地接收事件,你将不得不想一种方法,如何无序地消费它们,但这不是与Kafka有关的问题。

ilmyapht

ilmyapht2#

如果您无法保留事件顺序(Logout将是最后一个事件),您可以使用Kafka Streams的ProcesorApi来实现您的要求。Kafka Streams DSL可以与Processor API合并(更多详细信息请点击此处)。
您可以有几个分区,但特定用户的所有事件必须发送到同一分区。
你必须实现自定义的处理器/转换器。你的处理器将把每个事件/活动放在状态存储中(将来自特定用户的所有事件聚集在同一个键下)。处理器API使您能够创建某种 * 调度程序 (标点符号)。您可以计划每隔X秒检查特定用户的事件。如果注销*是 * 很久以前 * 的事件,您获取所有事件/活动,并进行一些聚合,然后将结果发送到下游。

nzkunb0c

nzkunb0c3#

正如在其他回答中所说的,在Kafka中,顺序是在每个分区的基础上保持的。
既然你说的是用户事件,为什么不把UserID作为你的Kafka主题键呢?这样,所有与特定用户相关的事件总是有序的(假设它们是由一个生产者生产的)。

您应该确保(通过 * 设计 *)只有一个Kafka生成器将所有用户更改事件推送到给定主题。这样,您可以 * 避免 * 由于多个生成器而导致消息乱序。

在流中,你可能还想看看Kafka流中的窗口。例如,滚动窗口是不重叠的,并且大小固定。你可以聚合一段时间内的记录。
现在,您可能希望按时间戳(或者您说您有注销时间、登录时间等)对聚合进行排序,并相应地采取行动。

简单有效的解决方案

使用同步发送并将delivery.timeout.msretries设置为最大值。要确保容错,请将acks=all设置为min.insync.replicas=2(主题配置)并使用单个生成器推送到该主题。您还应该将max.block.ms设置为某个最大值,以便在获取元数据时出错时send()不会立即返回(例如,当Kafka倒下的时候)。
以您的速率为同步发送设定基准,并检查它是否满足您的要求或基准数。
这确保了先到的消息先被发送给Kafka,然后直到前一个消息被成功确认后才发送下一个消息。
如果您的基准数字没有达到,尝试使用一种反压机制,如内存中/持久队列。
1.将事件添加到Thread-1中的队列
1.从Thread-2的队列中取事件(不出队)
1.在线程2中调用producer.send(...).get()
1.将事件从线程2中出队

baubqpgj

baubqpgj4#

关键是让你的前端跟踪器发送有序的事件到后端服务,然后后端服务产生事件给Kafka。
您可以通过对事件进行批处理,并仅在先前的批处理事件成功交付后才将批处理事件发送到后端来实现这一点。

相关问题