我有一个flink应用程序,需要消耗500多个主题(每个主题1个Kafka分区)。500个主题中的消息按键范围划分。所以键1-100在topicA中,101-200在topicB中,以此类推。这是一个有状态的应用程序,所以我想保持状态大小较低。如何确保来自每个主题的消息只由一组固定的确定性任务管理器处理。
List<String> topics = Arrays.asList("topic.A","topic.B",...);
KafkaSource<TestEvent> source = KafkaSource.<TestEvent>builder()
.setBootstrapServers(brokers)
.setTopics(topics)
.setGroupId("testconsumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new TestDeserializationSchema())
.build();
1条答案
按热度按时间3xiyfsfu1#
500个主题中的消息由键范围划分。所以键1-100在topicA中,101-200在topicB中,依此类推。
这不是Kafka分区的工作方式;你应该使用一个主题和500个分区。但是Flink可以订阅许多主题,就像任何其他Kafka消费者一样。
确保来自每个主题的消息仅由一组固定的确定性任务管理器处理
Flink应用程序将形成一个消费者组,因此将自动分发自己,其中每个示例将从总分配的主题订阅中消耗分区的子集。