// #atLeastOnceBatch
Consumer.Control control =
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1, msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset())
)
.batch(
20,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(3, c -> c.commitJavadsl())
.to(Sink.ignore())
.run(materializer);
// #atLeastOnceBatch
我试图测试alpakkafka连接器至少一次批处理示例,我得到以下编译时错误
consumermessage类型未定义适用于此处的createcommittableoffsetbatch(consumermessage.committableoffset)
并且类型consumermessage.committableoffsetbatch未定义适用于此处的更新(s,consumermessage.committableoffset)
1条答案
按热度按时间ppcbkaq51#
这些版本在V0.22中提供。不幸的是,与akka文档相比,alpakka文档缺少一点。