scala—一些使用者只获取一个分区的数据,而一些使用者则获取相同使用者组id的所有分区的数据

esyap4oy  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(252)

我有4个使用者使用使用者组id为一个主题使用数据。在4个使用者中,有2个使用者虽然分配给多个分区,但只获取一个分区的数据,还有2个使用者获取分配给该使用者的所有分区的数据。
如何让使用者获得分配给它的所有分区的数据?
我用的是fs2Kafka。

val brokers = "broker1:9094,broker2:9094,broker3:9094,broker4:9094"

val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
      .withEnableAutoCommit(false)
      .withAutoOffsetReset(AutoOffsetReset.Earliest)
      .withMaxPollInterval(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withPollTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withSessionTimeout(new FiniteDuration(30000, TimeUnit.MILLISECONDS))
      .withHeartbeatInterval(new FiniteDuration(10000, TimeUnit.MILLISECONDS))
      .withCommitTimeout(new FiniteDuration(300000, TimeUnit.MILLISECONDS))
      .withBootstrapServers(brokers)
      .withClientId(UUID.randomUUID().toString)
      .withGroupId("testConsumerGroupId")

 def processChunks(
      data: fs2.Chunk[CommittableConsumerRecord[F, K, V]]
  )(implicit ce: ConcurrentEffect[F]): fs2.Stream[F, CommittableOffset[F]] =
    for {
      _ <- fs2.Stream.eval(handleChunkOfWork(data.map(_.record).toList))
      offset <- fs2.Stream.chunk(data).map(_.offset)
    } yield offset

 def commitBatches(implicit ce: ConcurrentEffect[F]): Pipe[F, CommittableOffset[F], Unit] =
    _.chunks
      .evalMap { chunk =>
        for {
          _ <- CommittableOffsetBatch.fromFoldable(chunk).commit
        } yield ()
      }

consumerStream[F]
  .using(kafkaConsumerSettings.consumerSettings)
  .evalTap(_.subscribe("topic1"))
  .flatMap(_.partitionedStream)
  .flatMap { partionedStream =>
    partionedStream.chunkLimit(streamConfig.batchSize).map(processChunks).map(commitBatches)
  }
  .parJoinUnbounded
  .compile
  .drain

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题