apache kafka+kafka streams-如何确保原子性/事务性

4nkexdtk  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(432)

我正在评估apachekafka流的事件源,看看它对于复杂场景的可行性。与关系数据库一样,我遇到过一些情况,原子性/事务性是必不可少的:
提供两种服务的购物应用程序:
orderservice:有一个kafka流存储和orders(ordersstore)
产品服务:有一个Kafka流商店(productstockstore)和产品及其库存。
流量:
orderservice发布ordercreated事件(带有productid、orderid、userid信息)
productservice获取ordercreated事件并查询其kafkastreams存储(productstockstore)以检查产品是否有库存。如果有库存,它会发布orderupdated事件(还包括productid、orderid、userid信息)
关键是这个事件将由productservice kafka stream监听,它将处理它以减少库存,到目前为止还不错。
但是,想象一下:
客户1下订单,订单1(产品的库存为1)
客户2同时为同一产品下另一个订单order2(库存仍然是1)
productservice处理order1并发送消息orderupdated以减少库存。此消息放在order2->ordercreated的主题之后
productservice处理order2 ordercreated并发送消息orderupdated以再次减少库存。这是不正确的,因为它会导致不一致(现在库存应该是0)。
显而易见的问题是,当我们处理第一个orderupdated事件时,我们的物化视图(存储)应该直接更新。但是,更新kafka流存储的唯一方法(我知道)是发布另一个事件(orderupdated),由kafka流处理。这样我们就不能事务性地执行此更新。
我希望能有一些想法来处理这样的情况。
更新:我将尝试澄清问题中有问题的部分:
productservice有一个kafka streams商店,productstock有这个库存 (productId=1, quantity=1) orderservice在orders主题上发布两个orderplaced事件:
Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced") Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced") productservice在orders主题上有一个使用者。为了简单起见,让我们假设一个分区来保证消息按顺序使用。此使用者执行以下逻辑:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

productservice还有一个kafka streams处理器,负责更新库存:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");

event1将首先被处理,因为仍然有1个可用产品,所以它将生成productreserved事件。
现在,轮到event2了。如果在productservice kafka streams处理器处理由event1生成的productreseved事件之前,它被productservice使用者使用,则使用者仍会看到product1的productstore库存为1,为event2生成productreserved事件,然后在系统中产生不一致。

vu8f3i0k

vu8f3i0k1#

这个答案对于你最初的问题来说有点晚了,但是为了完整起见还是让我来回答吧。
有很多方法可以解决这个问题,但我鼓励用事件驱动的方法来解决这个问题。这意味着您(a)验证是否有足够的库存来处理订单,以及(b)将库存保留为单个,全部在单个kstreams操作中。诀窍是按productid重新设置密钥,这样您就知道相同产品的订单将在同一线程上按顺序执行(因此您不能进入order1和order2两次保留相同产品库存的情况)。
有一个帖子讨论了如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
也许更有用的是,还有一些示例代码也展示了如何做到这一点:https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/inventoryservice.java#l76
请注意,在这个kstreams代码中,第一行是如何将关键字重新设置为productid的,然后使用转换器来(a)验证是否有足够的库存来处理订单,以及(b)保留更新状态存储所需的库存。这是原子化的,使用kafka的事务特性。

cetgtptt

cetgtptt2#

在保证任何分布式系统的一致性时,同样的问题也很典型。通常使用processmanager/saga模式,而不是追求强一致性。这有点类似于分布式事务中的两阶段提交,但在应用程序代码中显式实现。它是这样的:
订单服务要求产品服务保留n个项目。产品服务要么接受该命令并减少库存,要么在没有足够的可用项时拒绝该命令。在对命令做出肯定回复后,order服务现在可以发出ordercreated事件(尽管我称之为orderplaced,因为“placed”听起来像是域的惯用模式,“created”更通用,但这只是一个细节)。产品服务侦听orderplaced事件或向其发送显式confirmresevation命令。或者,如果发生了其他事情(例如,未能清算资金),则可以发出适当的事件,或者将cancelreservation命令显式发送到productservice。为了满足特殊情况的需要,productservice还可能有一个调度器(在kafkastreams中,标点符号很方便)来取消在超时期间未确认或中止的预订。
这两个服务的编排以及处理错误条件和补偿操作(在这种情况下取消保留)的技术细节可以直接在服务中处理,也可以在一个显式的processmanager组件中处理,以分隔此职责。就我个人而言,我倾向于使用kafka streams处理器api实现一个显式的流程管理器。

相关问题