如何使用spring云流kafka和每个服务的数据库实现微服务事件驱动架构

emeijp43  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(664)

我正在尝试实现一个事件驱动的体系结构来处理分布式事务。每个服务都有自己的数据库,并使用kafka发送消息通知其他微服务操作。
举个例子:

Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

订单接收订单请求。它必须将新订单存储在数据库中并发布一条消息,以便支付服务意识到它必须对该项目收费:
私人订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

这些是我的疑问:
步骤a.-(按顺序db保存)和b.-(发布消息)应该在事务中原子地执行。我怎样才能做到这一点?
这与上一个相关:我发送消息时使用:ordersource.output().send(messagebuilder.withpayload(order.build());这个操作是异步的,并且总是返回true,不管kafka代理是否关闭。我怎么知道消息已经传到Kafka经纪人那里了?

pkbketx9

pkbketx91#

我认为实现事件源的正确方法是让kafka直接从从rdbms binlog读取的插件推送的事件中填充,例如使用合流瓶装水(https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/)或更活跃的debezium(http://debezium.io/). 然后消费微服务可以监听这些事件,消费它们并对它们各自的数据库进行操作,最终与rdbms数据库保持一致。
请看我的完整答案:https://stackoverflow.com/a/43607887/986160

3lxsmp7m

3lxsmp7m2#

步骤a.-(按顺序db保存)和b.-(发布消息)应该在事务中原子地执行。我怎样才能做到这一点?
kafka目前不支持事务(因此也不支持回滚或提交),您需要这样同步事务。所以简而言之:你不能做你想做的事。在不久的将来,当kip-98被合并时,这种情况将会改变,但这可能需要一段时间。另外,即使使用kafka中的事务,跨两个系统的原子事务也是一件非常困难的事情,接下来的所有事情只能通过kafka中的事务支持来改进,它仍然不能完全解决您的问题。为此,您需要研究在您的系统中实现某种形式的两阶段提交。
您可以通过配置producer属性来获得一些帮助,但最终您必须在至少一次或最多一次系统(mariadb或kafka)之间进行选择。
让我们从你在Kafka能做什么开始,确保信息的传递,接下来我们将深入研究你对整个流程的选择,以及结果是什么。
保证交货
您可以配置在使用参数acks将请求返回给您之前,有多少代理必须确认收到您的消息:通过将此设置为all,您可以告诉代理在所有副本都确认您的消息之后再返回答案。这仍然不能100%保证您的消息不会丢失,因为它只被写入页缓存,而且理论上存在代理在持久化到磁盘之前失败的情况,在这种情况下,消息可能仍然会丢失。但这是一个很好的保证。您可以通过降低代理强制fsync到disc的间隔(强调文本和/或flush.ms)来进一步降低数据丢失的风险,但请注意,这些值可能会带来严重的性能损失。
除了这些设置之外,您还需要等待您的Kafka制作者将您的请求的响应返回给您,并检查是否发生异常。这与你问题的第二部分有关,所以我会进一步深入。如果答案是明确的,你就可以尽可能确定你的数据已经传到Kafka,开始担心玛丽亚达。
到目前为止,我们讨论的所有内容都只涉及如何确保Kafka收到了您的消息,但是您还需要将数据写入mariadb,这也可能会失败,这就需要重新调用您可能已经发送给Kafka的消息,而这是您无法做到的。
因此,基本上,您需要选择一个系统,在该系统中,您能够更好地处理重复/丢失的值(取决于是否重新发送部分故障),这将影响您执行操作的顺序。
方案1

在此选项中,您在mariadb中初始化事务,然后将消息发送给kafka,等待响应,如果发送成功,则在mariadb中提交事务。如果发送到Kafka失败,您可以在mariadb中回滚事务,一切都很顺利。但是,如果发送到Kafka是成功的,而您对mariadb的承诺由于某种原因失败,那么就没有办法从Kafka那里得到消息。因此,如果您稍后重新发送所有内容,您将在mariadb中丢失一条消息,或者在kafka中有一条重复的消息。
方案2

这基本上是另一种方式,但是根据您的数据模型,您可能能够更好地删除用mariadb编写的消息。
当然,您可以通过跟踪失败的发送并稍后重试这两种方法来缓解这两种情况,但所有这些在更大的问题上更像是绷带。
就我个人而言,我会选择方法1,因为提交失败的几率应该比发送本身小一些,并在kafka的另一端实现某种dupe检查。
这与上一个相关:我发送消息时使用:ordersource.output().send(messagebuilder.withpayload(order.build());这个操作是异步的,并且总是返回true,不管kafka代理是否关闭。我怎么知道消息已经传到Kafka经纪人那里了?
首先,我承认我不熟悉spring,所以这可能对您没有用处,但是下面的代码片段演示了一种检查生成异常响应的方法。通过调用flush,您可以阻止所有发送,直到所有发送都完成(失败或成功),然后检查结果。

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}

相关问题