我有4个使用者使用使用者组id为一个主题使用数据。在4个使用者中,有2个使用者虽然分配给多个分区,但只获取一个分区的数据,还有2个使用者获取分配给该使用者的所有分区的数据。
如何让使用者获得分配给它的所有分区的数据?
我用的是fs2Kafka。
val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"
val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
.withEnableAutoCommit(false)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
.withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
.withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
.withBootstrapServers(brokers)
.withClientId(UUID.randomUUID().toString)
.withGroupId("testConsumerGroupId")
def processChunks(
data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
)(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
for {
_ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
offset <- fs2.Stream.chunk(data).map(_.offset)
} yield offset
def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
_.chunks
.evalMap { chunk =>
for {
_ <- CommittableOffsetBatch.fromFoldable(chunk).commit
} yield ()
}
consumerStream[F]
.using(kafkaConsumerSettings.consumerSettings)
.evalTap(_.subscribe("topic1"))
.flatMap(_.partitionedStream)
.flatMap { partionedStream =>
partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
}
.parJoinUnbounded
.compile
.drain
暂无答案!
目前还没有任何答案,快来回答吧!