这里有一个简单的场景。我正在使用akka流从kafka读取数据,并将其写入一个外部源,在我的例子中是:cassandra。
阿克卡流(ReactKafka)图书馆配备了我与背压和其他漂亮的东西,使这成为可能。
Kafka是一个源,Cassandra是一个接收器,当我得到一堆事件时,比如Cassandra通过Kafka在这里的查询,这些查询应该是按顺序执行的(例如:它可以是插入、更新和删除,并且必须是按顺序执行的)。
我不能用 mayAsync
并且执行这两个语句,future是急切的,并且有可能在insert之前先执行delete或update。
我不得不用Cassandra的 execute
相对于 executeAsync
这是非阻塞的。
没有办法为这个问题提供一个完整的异步解决方案,但是有没有一个非常优雅的方法来做到这一点呢?
例如:使未来变得懒惰和有序,并将其卸载到不同的执行上下文中。 mapAsync
也提供了一个并行选项。
monix任务在这里有帮助吗?
这是一个一般的设计问题,可以采取什么方法。
更新:
Flow[In].mapAsync(3)(input => {
input match {
case INSERT => //do insert - returns future
case UPDATE => //do update - returns future
case DELETE => //delete - returns future
}
情况要复杂一点。可能会有成千上万的插入,更新和删除来为特定的关键(在Kafka)我会理想地想执行一个单一的密钥顺序3期货。我相信莫妮克斯的任务能帮上忙?
3条答案
按热度按时间3zwjbxry1#
如果您想对从数据库操作中获得的信息进行处理,可以使用akka streams子流按键分组,然后合并子流:
请注意,来自输入源的顺序不会像在中那样保留在输出中
mapAsync
,但对同一个键的所有操作仍将是有序的。eoxn13cs2#
你在找什么
Future.flatMap
:它执行第一个函数,然后
Future
满意了,开始第二个。这个result
是一个新的Future
当第二次执行的结果满足时,这就完成了。第一个未来的结果被传递到您赋予的函数中
.flatMap
,因此第二个函数可以依赖于第一个函数的结果。例如:你也可以把它写成
for-comprehension
:ycggw6v23#
如果你用1的并行度处理事物,它们将以严格的顺序执行,这将解决你的问题。
但这并不有趣。如果您愿意,您可以并行运行不同键的操作—如果对不同键的处理是独立的,那么根据您的描述,这是可能的。要做到这一点,您必须缓冲传入的值,然后重新组合它。让我们看看一些代码:
这个
concatMap
这里的操作符将确保您的块也按顺序处理。所以即使有一个缓冲区key1 -> insert, key1 -> update
另一个有key1 -> delete
,这不会引起任何问题。在monix中,这与flatMap
,但在其他rx库中flatMap
可能是的别名mergeMap
没有订购保证。这可以通过
Future
同样,虽然没有标准的“顺序遍历”,所以你必须自己滚动,比如: