我已经创建了一个kafkastreamconsumer,上面提到了默认配置https://doc.akka.io/docs/alpakka-kafka/0.11/consumer.html. 消费者工作正常;但是,在使用第一条消息之前有一个初始延迟。第一条消息之后的消息几乎是实时的。请建议如果我需要调整一些配置。我使用下面的代码来创建消费者-
val consumer: RunnableGraph[DrainingControl[Done]] =
Consumer
.committableSource(consumerSettings, Subscriptions.topicPattern(conf.topic))
.mapAsync(conf.workers)(KafkaMessageHandler(transactionWorker.handleTransactionMessage))
.groupedWithin(conf.batchSize, 1.millis)
.map(CommittableOffsetBatch(_))
.mapAsync(conf.workers)(offset => {
logger.debug(s"committing ${offset.getOffsets}")
//TODO
offset.commitScaladsl()
})
.toMat(Sink.ignore)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
暂无答案!
目前还没有任何答案,快来回答吧!