我是流式数据处理的新手,我觉得这是一个非常基本的用例。
假设我有一股 (User, Alert)
元组。我想要的是限制每个用户的流量。i、 我想要一个只为用户输出一次警报的流。在接下来的60分钟里,用户收到的任何警报都应该被吞没。在这60分钟之后,将再次触发传入警报。
我尝试的是:
使用 aggregate
作为有状态的转换,但聚合状态是时间相关的。然而,即使 KTable
如果聚合值没有变化,则ktable(作为changelog)将继续向下发送元素,从而无法实现流“速率限制”的预期效果
val fooStream: KStream[String, String] = builder.stream("foobar2")
fooStream
.groupBy((key, string) => string)
.aggregate(() => "constant",
(aggKey: String, value: String, aggregate: String) => aggregate,
stringSerde,
"name")
.print
提供以下输出:
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
[KSTREAM-AGGREGATE-0000000004]: string , (constant<-null)
我一般不清楚怎么/什么时候 aggregate
决定向下游发布元素。我最初的理解是,这是立竿见影的,但事实似乎并非如此。据我所知,开窗对这里没用。
有没有可能kafka streams dsl目前没有考虑到这种状态转换的用例,类似于spark的updatestatebykey或akka的statefulmapconcat?我是否必须使用较低级别的处理器/转换器api?
编辑:
可能的重复确实涉及到这样一个问题:记录缓存是如何在聚合决定向下游发布元素时造成一些混乱的。然而,首要的问题是如何在dsl中实现“速率限制”。正如@miguno所指出的,我们必须恢复到较低级别的处理器api。下面我粘贴了一个非常冗长的方法:
val logConfig = new util.HashMap[String, String]();
// override min.insync.replicas
logConfig.put("min.insyc.replicas", "1")
case class StateRecord(alert: Alert, time: Long)
val countStore = Stores.create("Limiter")
.withKeys(integerSerde)
.withValues(new JsonSerde[StateRecord])
.persistent()
.enableLogging(logConfig)
.build();
builder.addStateStore(countStore)
class RateLimiter extends Transformer[Integer, Alert, KeyValue[Integer, Alert]] {
var context: ProcessorContext = null;
var store: KeyValueStore[Integer, StateRecord] = null;
override def init(context: ProcessorContext) = {
this.context = context
this.store = context.getStateStore("Limiter").asInstanceOf[KeyValueStore[Integer, StateRecord]]
}
override def transform(key: Integer, value: Alert) = {
val current = System.currentTimeMillis()
val newRecord = StateRecord(value._1, value._2, current)
store.get(key) match {
case StateRecord(_, time) if time + 15.seconds.toMillis < current => {
store.put(key, newRecord)
(key, value)
}
case StateRecord(_, _) => null
case null => {
store.put(key, newRecord)
(key, value)
}
}
}
}
1条答案
按热度按时间toiithl61#
假设我有一股
(User, Alert)
元组。我想要的是限制每个用户的流量。i、 我想要一个只为用户输出一次警报的流。在接下来的60分钟里,用户收到的任何警报都应该被吞没。在这60分钟之后,将再次触发传入警报。这在使用Kafka流的dsl时是不可能的。相反,您可以(并且需要)使用较低级别的处理器api手动实现这种行为。
仅供参考:我们已经在Kafka社区就是否在dsl中添加此类功能(通常称为“触发器”)进行了讨论。到目前为止,决定暂时不使用这种功能。
我一般不清楚怎么/什么时候
aggregate
决定向下游发布元素。我最初的理解是,这是立竿见影的,但事实似乎并非如此。是的,这是Kafka0.10.0.0的最初行为。从那时起(不确定您使用的是什么版本),我们引入了记录缓存;如果您禁用了记录缓存,那么您将恢复最初的行为,不过据我所知,记录缓存将为您提供某种(间接的)速率限制旋钮。因此,您可能希望保持缓存处于启用状态。
不幸的是,apachekafka文档还没有涵盖记录缓存,同时您可能需要阅读http://docs.confluent.io/current/streams/developer-guide.html#memory-而不是管理层。