SpringCloud—如何忽略kafka streams应用程序中的某些类型的消息,这些应用程序从同一主题中读取和写入不同的事件类型

ybzsozfc  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(416)

假设一个spring云流应用程序创建了一个 KStream 从一个 order topic . 它对 OrderCreated {"id":x, "productId": y, "customerId": z} 事件。一旦一个到达,它就处理它并生成一个输出事件 OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z} 相同的 order topic .
我面临的问题是,由于kafka流应用程序从同一主题读写,因此它试图处理自己的写操作,这是没有意义的。
如何防止此应用程序处理它生成的事件?
更新:正如artem bilan和sobychako指出的,我考虑过使用 KStream.filter() 但有一些细节让我怀疑如何应对:
现在kstream应用程序如下所示:

  1. interface ShippingKStreamProcessor {
  2. ...
  3. @Input("order")
  4. fun order(): KStream<String, OrderCreated>
  5. @Output("output")
  6. fun output(): KStream<String, OrderShipped>

kstream配置

  1. @StreamListener
  2. @SendTo("output")
  3. fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

order和output绑定都指向order主题作为目标。
ordercreated类:

  1. data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
  2. constructor() : this(null, null, null)
  3. }

ordershipped类

  1. data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
  2. constructor() : this(null, null, null, null)
  3. }

我使用json作为消息格式,因此消息如下所示:
输入-已创建订单: {"id":1, "productId": 7,"customerId": 20} 输出-订单发货: {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"} 考虑到以下情况,我正在寻找过滤不需要的消息的最佳方法:
如果我用 KStream.filter() 现在,当我 {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"} 我的 KStream<Int, OrderCreated> 将ordershipped事件解组为ordercreated对象,其中包含一些空字段: OrderCreated(id:1, productId: 7, customerId: null) . 检查空字段听起来不太可靠。
一个可能的解决方案是添加另一个字段, eventType = OrderCreated|OrderShipped ,每种使用该主题的消息/类。即使在这种情况下,我也会得到一个ordercreated类(记住kstream<int,ordercreated>),其属性eventtype=ordershipped。这看起来像一个丑陋的解决办法。有什么改进的办法吗?
有没有其他更自动的方法来处理这个问题?例如,如果消息不符合预期的模式(ordercreated),另一种序列化(avro)会阻止消息被处理吗?根据本文,这种在同一主题中支持多个模式(事件类型)的方法似乎是一种很好的实践:https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 但是不清楚如何对不同类型进行解组/反序列化。

3bygqnnd

3bygqnnd1#

您可以使用kafka的记录头来存储记录的类型。见kip-82。您可以在中设置标题 ProducerRecord .
处理过程如下:
阅读 stream 类型 KStream<Integer, Bytes> 有价值的服务 Serdes.BytesSerde 从主题开始。
使用 KStream#transformValues() 过滤和创建对象。更具体地说,在 transformValues() 您可以访问 ProcessorContext 它允许您访问包含有关记录类型信息的记录头。然后:
如果类型为 OrderShipped ,返回 null .
否则创建 OrderCreatedBytes 对象并返回它。
对于使用avro的解决方案,您可能需要查看以下文档
https://docs.confluent.io/current/streams/developer-guide/datatypes.html
https://docs.confluent.io/current/schema-registry/serializer-formatter.html

w8f9ii69

w8f9ii692#

我接受布鲁诺的回答,认为这是解决这个问题的有效方法。不过,我认为我已经找到了一种更直接/更合理的方法,使用带有注解的事件层次结构 JsonTypeInfo .
首先,需要为order事件指定一个基类,并指定所有子类。请注意,json文档中将添加一个类型属性,该属性将帮助jackson封送/解封dto:

  1. @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
  2. @JsonSubTypes(value = [
  3. JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
  4. JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
  5. ])
  6. abstract class OrderEvent
  7. data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
  8. constructor() : this(null, null, null)
  9. }
  10. data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
  11. constructor() : this(null, null, null, null)
  12. }

有了它,ordercreatedevent对象的生产者将生成如下消息: key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1} 现在轮到kstream了。我已将签名改为 KStream<Int, OrderEvent> 因为它可以接收ordercreatedevent或ordershippedevent。在接下来的两行中。。。

  1. orderEvent.filter { _, value -> value is OrderCreatedEvent }
  2. .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

... 我筛选只保留ordercreatedevent类的消息,并将它们Map为转换 KStream<Int, OrderEvent> 变成一个 KStream<Int, OrderCreatedEvent> 全kstream逻辑:

  1. @StreamListener
  2. @SendTo("output")
  3. fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
  4. val intSerde = Serdes.IntegerSerde()
  5. val customerSerde = JsonSerde<Customer>(Customer::class.java)
  6. val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
  7. val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
  8. Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
  9. .withKeySerde(intSerde)
  10. .withValueSerde(customerSerde)
  11. val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
  12. .reduce({ _, y -> y }, stateStore)
  13. return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
  14. .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
  15. .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
  16. .join(customerTable, { orderIt, customer ->
  17. OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
  18. }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
  19. .selectKey { _, value -> value.id }
  20. //.to("order", Produced.with(intSerde, orderShippedSerde))
  21. }

在这个过程之后,我将生成一条新消息 key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"} 但这将被流过滤掉。

展开查看全部

相关问题