我有我处理的输入数据流。每个流以数据块的形式发送。在处理完同一流i的第n个数据块之后,我只能处理流i的第n+1个数据块。因此,并行化可以通过一次处理多个流来实现,但是我不能在多个worker上拆分一个流。
一个流的块按顺序添加到队列中(尽管可以同时添加多个流的块)。
大多数消息队列,如rabbitmq,在多个worker对一个队列进行操作时保证有序传递。然而,为了实现我想要的行为,我需要将每个队列的worker数限制为1,这样下一个块总是在前一个块完成时才被处理。为了并行化,我可以为每个流创建一个队列,或者为每个工作线程创建一个队列,并使用另一个进程将流重定向到工作线程队列。事实上,每个工作者一个队列的方法就是我现在所做的,使用rabbitmq的一致散列和铲。当然,就负载平衡和工人数量的动态伸缩而言,这还远远不够理想。
我读了很多关于Kafka的书,以及它是如何为时间序列数据(比如日志)设计的。然而,我不知道如何应用Kafka或其他消息队列来解决我的问题。
我将非常感谢一些关于如何最好地使用消息队列来解决我的问题的提示。
2条答案
按热度按时间8ulbf1ek1#
您可以使用kafka,但是您必须使用一些流标识来对生产者端的消息进行散列,以便来自一个流的消息总是到达同一个分区。
然后,在使用者端,您必须使用低级使用者生成与分区数量相同的消耗线程,其中每个线程将从单个分区消耗。
这意味着您总是在每个流中按顺序处理消息。
我还没有查看kafka 0.9 producer是如何工作的,但是有一些更改,所以如果您想使用最新版本,您可能应该查看这些更改。
ef1yzkbh2#
为什么不在收到前一个区块的交付确认之后才推送下一个区块给工人呢?或者前一个chuck由worker处理的某种标志,标志设置为true,然后推送下一个chunk。
如果需要并行化工作,请创建几个具有唯一路由键的队列,根据路由键将块推送到相应的队列。并且每个路由密钥都有单独的标志。