scala—在db非阻塞系统中进行后续顺序插入的执行顺序

7gs2gvoe  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(326)

这里有一个简单的场景。我正在使用akka流从kafka读取数据,并将其写入一个外部源,在我的例子中是:cassandra。
阿克卡流(ReactKafka)图书馆配备了我与背压和其他漂亮的东西,使这成为可能。
Kafka是一个源,Cassandra是一个接收器,当我得到一堆事件时,比如Cassandra通过Kafka在这里的查询,这些查询应该是按顺序执行的(例如:它可以是插入、更新和删除,并且必须是按顺序执行的)。
我不能用 mayAsync 并且执行这两个语句,future是急切的,并且有可能在insert之前先执行delete或update。
我不得不用Cassandra的 execute 相对于 executeAsync 这是非阻塞的。
没有办法为这个问题提供一个完整的异步解决方案,但是有没有一个非常优雅的方法来做到这一点呢?
例如:使未来变得懒惰和有序,并将其卸载到不同的执行上下文中。 mapAsync 也提供了一个并行选项。
monix任务在这里有帮助吗?
这是一个一般的设计问题,可以采取什么方法。
更新:

Flow[In].mapAsync(3)(input => {

 input match {
    case INSERT => //do insert - returns future
    case UPDATE => //do update - returns future
    case DELETE => //delete - returns future
}

情况要复杂一点。可能会有成千上万的插入,更新和删除来为特定的关键(在Kafka)我会理想地想执行一个单一的密钥顺序3期货。我相信莫妮克斯的任务能帮上忙?

3zwjbxry

3zwjbxry1#

如果您想对从数据库操作中获得的信息进行处理,可以使用akka streams子流按键分组,然后合并子流:

def databaseOp(input: In): Future[Out] = input match {
  case INSERT => ...
  case UPDATE => ...
  case DELETE => ...
}

val databaseFlow: Flow[In, Out, NotUsed] =
  Flow[In].groupBy(Int.maxValues, _.key).mapAsync(1)(databaseOp).mergeSubstreams

请注意,来自输入源的顺序不会像在中那样保留在输出中 mapAsync ,但对同一个键的所有操作仍将是有序的。

eoxn13cs

eoxn13cs2#

你在找什么 Future.flatMap :

def doSomething: Future[Unit]
def doSomethingElse: Future[Unit]

val result = doSomething.flatMap { _ => doSomethingElse }

它执行第一个函数,然后 Future 满意了,开始第二个。这个 result 是一个新的 Future 当第二次执行的结果满足时,这就完成了。
第一个未来的结果被传递到您赋予的函数中 .flatMap ,因此第二个函数可以依赖于第一个函数的结果。例如:

def getUserID: Future[Int]
def getUser(id: Int): Future[User]

val userName: Future[String] = getUserID.flatMap(getUser).map(_.name)

你也可以把它写成 for-comprehension :

for {
  id <- getUserID
  user <- getUser(id)
} yield user.name
ycggw6v2

ycggw6v23#

如果你用1的并行度处理事物,它们将以严格的顺序执行,这将解决你的问题。
但这并不有趣。如果您愿意,您可以并行运行不同键的操作—如果对不同键的处理是独立的,那么根据您的描述,这是可能的。要做到这一点,您必须缓冲传入的值,然后重新组合它。让我们看看一些代码:

import monix.reactive.Observable
import scala.concurrent.duration._

import monix.eval.Task

// Your domain logic - I'll use these stubs
trait Event
trait Acknowledgement // whatever your DB functions return, if you need it
def toKey(e: Event): String = ???
def processOne(event: Event): Task[Acknowledgement] = Task.deferFuture {
  event match {
    case _ => ??? // insert/update/delete
  }
}

// Monix Task.traverse is strictly sequential, which is what you need
def processMany(evs: Seq[Event]): Task[Seq[Acknowledgement]] =
  Task.traverse(evs)(processOne)

def processEventStreamInParallel(source: Observable[Event]): Observable[Acknowledgement] =
  source
    // Process a bunch of events, but don't wait too long for whole 100. Fine-tune for your data source
    .bufferTimedAndCounted(2.seconds, 100)
    .concatMap { batch =>
      Observable
        .fromIterable(batch.groupBy(toKey).values) // Standard collection methods FTW
        .mapAsync(3)(processMany) // processing up to 3 different keys in parallel - tho 3 is not necessary, probably depends on your DB throughput
        .flatMap(Observable.fromIterable) // flattening it back
    }

这个 concatMap 这里的操作符将确保您的块也按顺序处理。所以即使有一个缓冲区 key1 -> insert, key1 -> update 另一个有 key1 -> delete ,这不会引起任何问题。在monix中,这与 flatMap ,但在其他rx库中 flatMap 可能是的别名 mergeMap 没有订购保证。
这可以通过 Future 同样,虽然没有标准的“顺序遍历”,所以你必须自己滚动,比如:

def processMany(evs: Seq[Event]): Future[Seq[Acknowledgement]] =
  evs.foldLeft(Future.successful(Vector.empty[Acknowledgement])){ (acksF, ev) =>
    for {
      acks <- acksF
      next <- processOne(ev)
    } yield acks :+ next
  }

相关问题