在我的应用程序中,我们创建了许多流和许多消费者(每个流一个)。我正在尝试对所有流只使用一个消费者。我是这样做的:
Consumer
.committableExternalSource(
consumer, Subscriptions.assignment(new TopicPartition("topic", 0), group, 50.seconds)
)
这里的问题是我不想手动分配分区。我希望能够通过消费者,但使用Kafka的自动分区分配。所以我试着这样:
Consumer
.plainPartitionedSource(KafkaConfigurations.consumerSettings(group), Subscriptions.topics("topic"))
.map {
case (topicPartition, source) =>
Consumer
.committableExternalSource(consumer, Subscriptions.assignment(topicPartition), group, 50.seconds)
}.flatMapMerge(10, identity)
但是,我猜在使用 plainPartitionedSource
所以,问题是是否有可能使用这种或类似的方法来实现,或者我是否必须为这个用例创建一个不同的(定制的)实现。
暂无答案!
目前还没有任何答案,快来回答吧!