如何在kafka streams中为流添加冷却时间/速率限制?

kxeu7u2r  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(344)

我是流式数据处理的新手,我觉得这是一个非常基本的用例。
假设我有一股 (User, Alert) 元组。我想要的是限制每个用户的流量。i、 我想要一个只为用户输出一次警报的流。在接下来的60分钟里,用户收到的任何警报都应该被吞没。在这60分钟之后,将再次触发传入警报。
我尝试的是:
使用 aggregate 作为有状态的转换,但聚合状态是时间相关的。然而,即使 KTable 如果聚合值没有变化,则ktable(作为changelog)将继续向下发送元素,从而无法实现流“速率限制”的预期效果

val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
  .groupBy((key, string) => string)
  .aggregate(() => "constant",
    (aggKey: String, value: String, aggregate: String) => aggregate,
    stringSerde,
    "name")
  .print

提供以下输出:

[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)

我一般不清楚怎么/什么时候 aggregate 决定向下游发布元素。我最初的理解是,这是立竿见影的,但事实似乎并非如此。据我所知,开窗对这里没用。
有没有可能kafka streams dsl目前没有考虑到这种状态转换的用例,类似于spark的updatestatebykey或akka的statefulmapconcat?我是否必须使用较低级别的处理器/转换器api?
编辑:
可能的重复确实涉及到这样一个问题:记录缓存是如何在聚合决定向下游发布元素时造成一些混乱的。然而,首要的问题是如何在dsl中实现“速率限制”。正如@miguno所指出的,我们必须恢复到较低级别的处理器api。下面我粘贴了一个非常冗长的方法:

val logConfig = new util.HashMap[String, String]();
  // override min.insync.replicas
  logConfig.put("min.insyc.replicas", "1")

  case class StateRecord(alert: Alert, time: Long)

  val countStore = Stores.create("Limiter")
    .withKeys(integerSerde)
    .withValues(new JsonSerde[StateRecord])
    .persistent()
    .enableLogging(logConfig)
    .build();
  builder.addStateStore(countStore)

  class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
    var context: ProcessorContext = null;
    var store: KeyValueStore[Integer, StateRecord] = null;

    override def init(context: ProcessorContext) = {
      this.context = context
      this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
    }

    override def transform(key: Integer, value: Alert) = {
      val current = System.currentTimeMillis()
      val newRecord = StateRecord(value._1, value._2, current)
      store.get(key) match {
        case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
          store.put(key, newRecord)
          (key, value)
        }
        case StateRecord(_, _) => null
        case null => {
          store.put(key, newRecord)
          (key, value)
        }
      }
    }
  }
toiithl6

toiithl61#

假设我有一股 (User, Alert) 元组。我想要的是限制每个用户的流量。i、 我想要一个只为用户输出一次警报的流。在接下来的60分钟里,用户收到的任何警报都应该被吞没。在这60分钟之后,将再次触发传入警报。
这在使用Kafka流的dsl时是不可能的。相反,您可以(并且需要)使用较低级别的处理器api手动实现这种行为。
仅供参考:我们已经在Kafka社区就是否在dsl中添加此类功能(通常称为“触发器”)进行了讨论。到目前为止,决定暂时不使用这种功能。
我一般不清楚怎么/什么时候 aggregate 决定向下游发布元素。我最初的理解是,这是立竿见影的,但事实似乎并非如此。
是的,这是Kafka0.10.0.0的最初行为。从那时起(不确定您使用的是什么版本),我们引入了记录缓存;如果您禁用了记录缓存,那么您将恢复最初的行为,不过据我所知,记录缓存将为您提供某种(间接的)速率限制旋钮。因此,您可能希望保持缓存处于启用状态。
不幸的是,apachekafka文档还没有涵盖记录缓存,同时您可能需要阅读http://docs.confluent.io/current/streams/developer-guide.html#memory-而不是管理层。

相关问题