ApacheKafka—编写用于在scala的listbuffer中存储数据的不可变代码

ss2ws0br  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(370)

我有下面的代码,我正在使用可变列表缓冲区来存储从kafka消费者收到的文件,然后当列表大小达到15时,我将它们插入cassandra。但是他们有没有办法用不变列表做同样的事情呢。

val filesList = ListBuffer[SystemTextFile]()
  storeservSparkService.configFilesTopicInBatch.subscribe.atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
    filesList += file
    if (filesList.size == 15) {
      storeServSystemRepository.config.insertFileInBatch(filesList.toList)
      filesList.clear()
    }
    Future(Done)
  })
osh3o9ms

osh3o9ms1#

沿着这些路线?

Flow[SystemTextFile].grouped(15).mapAsync(4){ files =>
  storeServSystemRepository.config.insertFileInBatch(files)
}
ymzxtsji

ymzxtsji2#

你试过使用向量吗?

val filesList = Vector[SystemTextFile]()
      storeservSparkService.configFilesTopicInBatch.subscribe.
          atLeastOnce(Flow[SystemTextFile].mapAsync(4) { file: SystemTextFile =>
       filesList = filesList :+ file
       if (filesList.length == 15) {
            storeServSystemRepository.config.insertFileInBatch(filesList.toList)
       }
       Future(Done)
     })

相关问题