如何在monix中为每个任务添加固定延迟而不增加延迟

64jmpszr  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(211)

我正在用monix从Kafka那里获取消息。
我想在从主题中读取消息10秒后处理该消息。
10秒一分钟的延迟不应该阻止读取更多消息。
我尝试使用以下代码测试这种行为,使用task.delayexecution来延迟10秒
我也试过了 Observable.delayOnExecution 以及 Observable.delayOnNext ```
import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global

val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date)))
.mapEval(i =>
Task.delay(
SomeClass(
value = i.value,
consumingDate = i.consumingDate
)
).delayExecution(10 seconds)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}

但是上面的代码为每条消息增加了延迟。第一条消息延迟10秒,但第二条消息延迟20秒,第三条消息延迟30秒,以此类推。
有没有可能用monix做这样的事情?
我正在考虑其他替代基于monix的解决方案,比如使用内存队列。消费者会一直推到队列,直到达到限制,然后。
更新:
我找到了一个解决方法 `Task.eval(<current_time>).restartUntil(<condition>)` 添加下面的代码。

package com.agoda.cusco.campaign.data.collector.consumer

import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global

val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date(System.currentTimeMillis + i*100))))
.mapEval(message =>
Task.eval(new Date()).restartUntil(currentTime => (currentTime.getSeconds - message.consumingDate.getSeconds) > 10).map(_ => message)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}

我不完全确定它是否是理想的解决方案,因为它似乎在做活动的cpu计算,以使其工作。
想看看有没有更好的替代品。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题