我有一个带有历史记录的入口数据库表,它记录数据库事件,比如插入更新和删除。我有一个生产者将是多线程。这个生产者将阅读历史表,找到哪一个表和哪一行选择,然后该行将被添加到Kafka主题。现在,生产者需要确保事件以历史记录表登录的方式添加到kafka主题中。因此,使用者读取它们的顺序与它记录在历史表中的顺序相同,并在postgrase db上执行它。
我可以把这些数据提供给多个生产商。例子
Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message 11 to 15
但当我消费的时候,我得到的信息是关于
messageId 1
messageId 2
messageId 3
messageId 6
messageId 7
messageId 11
等等
我想按下面的顺序收到所有信息
messageId 1
messageId 2
messageId 3
messageId 4
messageId 5
messageId 6
messageId 7
messageId 8
messageId 9
等等
注意:-我有一个主题、一个分区和一个使用者
3条答案
按热度按时间6vl6ewon1#
根据google的建议,如果您使用一个同步发布者(producer)和一个订阅者,请遵循页面后半部分节点js代码中的算法,以保证处理顺序。
类似地,如果您有多个发布服务器,则需要通过在getpublishcountervalue方法和setpublishcountervalue方法之间设置一个关键部分来同步发布服务器,这会破坏发布服务器的多线程特性。
最好的解决办法是遵循
最后结果的顺序很重要
典型用例:日志、状态更新
多线程发布者必须为每个发布/子事件消息附加一个时间戳,以便订阅者可以将事件消息作为实体存储在google云数据存储或firestore中。单独的事件消息处理器cron作业可以以时间戳排序的方式检索事件消息的实体,以强制消息排序。
oxf4rvwz2#
kafka不保证发送数据时的顺序,因为默认情况下每个主题有几个分区,如果没有密钥,消息会随机分配到分区。在下游,每个分区都可以独立地使用。
如果您需要保证插入和使用顺序,那么需要将kafka主题配置为仅使用1个分区。这是保证Kafka秩序的唯一途径。但是,您将失去kafka的许多优点,它是跨多个服务器、核心等分布的高性能产品。
kmbjn2e33#
您最多可以保持消息的顺序与生产者通过发送到单个分区所创建的顺序相同。kafka分区保证了消息的使用顺序,即在分区中创建消息的顺序。
在您的场景中,消息是由多个生产者生成的,它们不同步以按顺序用消息填充分区。所以不可能像你所期望的那样在消费端完成订单。