在akka流中共享使用者,同时保留kafka的分区重新平衡功能

cedebl8k  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(221)

在我的应用程序中,我们创建了许多流和许多消费者(每个流一个)。我正在尝试对所有流只使用一个消费者。我是这样做的:

Consumer
  .committableExternalSource(
    consumer, Subscriptions.assignment(new TopicPartition("topic", 0), group, 50.seconds)
  )

这里的问题是我不想手动分配分区。我希望能够通过消费者,但使用Kafka的自动分区分配。所以我试着这样:

Consumer
  .plainPartitionedSource(KafkaConfigurations.consumerSettings(group), Subscriptions.topics("topic"))
  .map {
    case (topicPartition, source) =>
      Consumer
        .committableExternalSource(consumer, Subscriptions.assignment(topicPartition), group, 50.seconds)
  }.flatMapMerge(10, identity)

但是,我猜在使用 plainPartitionedSource 所以,问题是是否有可能使用这种或类似的方法来实现,或者我是否必须为这个用例创建一个不同的(定制的)实现。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题