我有下面的代码,我正在使用可变列表缓冲区来存储从kafka消费者收到的文件,然后当列表大小达到15时,我将它们插入cassandra。但是他们有没有办法用不变列表做同样的事情呢。
val filesList = ListBuffer[SystemTextFile]()
storeservSparkService.configFilesTopicInBatch.subscribe.atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
filesList += file
if (filesList.size == 15) {
storeServSystemRepository.config.insertFileInBatch(filesList.toList)
filesList.clear()
}
Future(Done)
})
2条答案
按热度按时间osh3o9ms1#
沿着这些路线?
ymzxtsji2#
你试过使用向量吗?