我在努力寻找在Kafka流中第n个事件上执行动作的最佳方式。
我的例子是:我有一个包含一些事件的输入流。我必须按eventtype==login对它们进行过滤,并在同一accountid的第n次登录(比如第五次登录)时将此事件发送到输出流。
经过一些调查和不同的尝试,我得到了下面代码的版本(我使用的是kotlin)。
data class Event(
val payload: Any = {},
val accountId: String,
val eventType: String = ""
)
// intermediate class to keep the key and value of the original event
data class LoginEvent(
val eventKey: String,
val eventValue: Event
)
fun process() {
val userLoginsStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("logins"),
Serdes.String(),
Serdes.Integer()
)
val streamsBuilder = StreamsBuilder().addStateStore(userCheckInsStoreBuilder)
val inputStream = streamsBuilder.stream<String, String>(inputTopic)
inputStream.map { key, event ->
KeyValue(key, json.readValue<Event>(event))
}.filter { _, event -> event.eventType == "login" }
.map { key, event -> KeyValue(event.accountId, LoginEvent(key, event)) }
.transform(
UserLoginsTransformer("logins", 5),
"logins"
)
.filter { _, value -> value }
.map { key, _ -> KeyValue(key.eventKey, json.writeValueAsString(key.eventValue)) }
.to("fifth_login", Produced.with(Serdes.String(), Serdes.String()))
...
}
class UserLoginsTransformer(private val storeName: String, private val loginsThreshold: Int = 5) :
TransformerSupplier<String, CheckInEvent, KeyValue< LoginEvent, Boolean>> {
override fun get(): Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
return object : Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
private lateinit var store: KeyValueStore<String, Int>
@Suppress("UNCHECKED_CAST")
override fun init(context: ProcessorContext) {
store = context.getStateStore(storeName) as KeyValueStore<String, Int>
}
override fun transform(key: String, value: LoginEvent): KeyValue< LoginEvent, Boolean> {
val counter = (store.get(key) ?: 0) + 1
return if (counter == loginsThreshold) {
store.delete(key)
KeyValue(value, true)
} else {
store.put(key, counter)
KeyValue(value, false)
}
}
override fun close() {
}
}
}
}
我最担心的是 transform
在我的例子中,函数不是线程安全的。我已经检查了在我的案例中使用的kv存储的实现,这是rocksdb存储(非事务性的),因此值可能在读取和比较之间更新,错误的事件将被发送到输出。
我的其他想法:
使用物化视图作为一个没有转换器的存储,但我一直坚持实现。
创建一个将使用transactionalrocksdb的自定义持久存储(不确定是否值得)。
创建一个定制的持久性kv存储,它将在内部使用concurrenthashmap(如果我们期望的用户很多,它可能会导致高内存消耗)。
还有一点需要注意:我使用的是springcloudstream,所以这个框架可能为我的案例提供了一个内置的解决方案,但我没有找到它。
如有任何建议,我将不胜感激。提前谢谢。
1条答案
按热度按时间5tmbdcev1#
我最担心的是转换函数在我的例子中不是线程安全的。我已经检查了在我的案例中使用的kv存储的实现,这是rocksdb存储(非事务性的),因此值可能在读取和比较之间更新,错误的事件将被发送到输出。
没有理由担心。如果使用多个线程运行,则每个线程都有自己的rocksdb,它存储一个总体数据碎片(请注意,总体状态是基于输入主题分区的碎片,而单个碎片永远不会由不同的线程处理)。因此,您的代码将正常工作。您唯一需要确保的是,数据是按分区的
accountId
,使得单个帐户的登录事件转到同一个shard。如果您输入的数据已经被
accountId
当写进你的输入主题时,你不需要做任何事情。如果不是,并且您可以控制上游应用程序,那么在上游的应用程序生成器中使用自定义分区器来获得所需的分区可能是最简单的。如果无法更改上游应用程序,则需要在设置accountId
作为新钥匙through()
在你打电话之前transform()
.