在Scala中使用Stream处理大文件

hivapdat  于 2023-08-05  发布在  Scala
关注(0)|答案(2)|浏览(114)

我有一个大的csv文件与用户数据。我有一个端点,它获取一个用户,并应该返回一个布尔值,指示该用户是否存在于文件中。为了避免内存不足,我以流的形式读取文件,并使用getLines,这是一种懒惰的方式。下面是我的代码:

def readUsers(filePath: Path): IO[Seq[User]] = {
    Stream
      .eval(IO(scala.io.Source.fromFile(filePath.toFile)))
      .flatMap(source => Stream.fromIterator[IO](source.getLines(), 64))
      .map(line => line.split(",")
      .map(cols => User(cols(0), cols(1), cols(2), cols(3)))
      .compile
      .toList
  }

  def isUserExists(user: User, users: IO[Seq[User]]): IO[Boolean] =
    users.map(_.contains(user))

字符串
当应用程序启动时,我为users分配一个值:

val users = readUsers(filePath)


并在调用端点时将其发送到isUserExists函数。
我的问题是:

  1. toList是否将整个文件保存在内存中?
    1.如果是,如何避免?我应该删除toList并迭代isUserExists中的每一行吗?
    1.当文件的内容发生变化时,由于users是一个IO值,它是否会反映在端点中?
iswrvxsc

iswrvxsc1#

import cats.effect.IO
import cats.implicits._
import fs2._
import fs2.io.file.Files
import fs2.io.file.Path
import java.nio.file.{Path => JPath}

object SO20230802 {

  val givenUser: User = ???

  def readUsers(filePath: JPath): IO[Boolean] =
    Files[IO]
      .readAll(Path.fromNioPath(filePath))
      .through(text.utf8.decode)
      .through(text.lines)
      .map(_.split(","))
      .map {
        case Array(a, b, c, d) => User(a, b, c, d).some
        case _                 => None
      }
      .unNone
      .dropWhile(_ != givenUser)
      .take(1)
      .as(true)
      .lastOr(false)
      .compile
      .lastOrError

}

字符串
fs2.io将正确管理资源

lg40wkob

lg40wkob2#

是的,你的问题是你把流变成了一个具体而完整的列表。为了从流中受益,你必须尽可能多地使用流函数和引用。我建议readUsers返回一个fs2.Stream[IO, User],然后开始将Streams作为一个“主要”类型,在整个应用程序的签名中使用。随着您对Streams越来越熟悉,使用起来应该会更自然。

import java.nio.file._
  case class User(name :String)
  import cats._
  import cats.effect._
  
  
  def readUsers(filePath: Path): fs2.Stream[IO, User] = {
    fs2.Stream
      .eval(IO(scala.io.Source.fromFile(filePath.toFile))) // please see fs2.io!!!
      .flatMap(source => fs2.Stream.fromIterator[IO](source.getLines()))
      .map(line => line.split(","))
      .map(cols => User(cols(0) /* simplified example, cols(1), cols(2), cols(3) */))
      // removed lines turning steam into a concrete thing.
  }

  def isUserExists(user: User, users: fs2.Stream[IO, User]): IO[Boolean] =
    userList
        .exists(_ == User("henry"))
        .last
        .compile
        .toList
        .map(_.flatten.headOption.getOrElse(false))

  val userList = fs2.Stream.fromIterator[IO](List(
    User("henry"),
    User("donald")
  ).toIterator)
  
  isUserExists(User("donald"), userList)
    .map { ans => println(s"Got this: $ans") }
    .unsafeRunSync()

字符串
就示例而言,这并没有真正展示fs流的强大功能,因为exists()有效地立即耗尽了整个流。我不确定代码是如何使用的,但是您希望只遍历一次流,因此重复isUserExist调用不一定适合最佳用例。

相关问题