我需要发送几批消息,并确保每个批中的所有消息在同一批中一起到达消费者。例如,假设我需要在5个批/组中发送400条消息,每个组将包含80条消息,并且需要在消费端的同一批中使用。我正在使用spark结构化流媒体来消费消息。我读过类似的问题,但我仍然对正确的方法感到困惑。制作人是否应该将所有信息(每批)放在一个列表中,然后将列表发送给Kafka?还有其他更好的办法吗?谢谢
yyyllmsg1#
这可以通过创建一个包含5个分区的主题来实现,这样您就可以向每个分区发送每种类型的批处理消息
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value) Creates a record to be sent to a specified topic and partition
我们可以创建5个消费者,并将每个消费者分配给每个分区,但我不确定每个消费者poll()是否会一次提取该分区中的所有消息手动分区分配。给你,医生例如:如果进程正在维护与该分区相关联的某种本地状态(如本地磁盘键值存储),那么它应该只获取它在磁盘上维护的分区的记录。如果进程本身是高可用的,并且在失败时将重新启动(可能使用诸如yarn、mesos或aws设施之类的集群管理框架,或者作为流处理框架的一部分)。在这种情况下,kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。要使用这种模式,您只需调用assign(collection)和要使用的分区的完整列表,而不是使用subscribe订阅主题。
String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
1条答案
按热度按时间yyyllmsg1#
这可以通过创建一个包含5个分区的主题来实现,这样您就可以向每个分区发送每种类型的批处理消息
我们可以创建5个消费者,并将每个消费者分配给每个分区,但我不确定每个消费者poll()是否会一次提取该分区中的所有消息
手动分区分配。给你,医生
例如:如果进程正在维护与该分区相关联的某种本地状态(如本地磁盘键值存储),那么它应该只获取它在磁盘上维护的分区的记录。
如果进程本身是高可用的,并且在失败时将重新启动(可能使用诸如yarn、mesos或aws设施之类的集群管理框架,或者作为流处理框架的一部分)。在这种情况下,kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。
要使用这种模式,您只需调用assign(collection)和要使用的分区的完整列表,而不是使用subscribe订阅主题。