Kafka 捕获并处理 Saga 中的消费者抛出的异常

gojuced7  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(72)

我正在开发一个使用Kafka并应用 Saga 模式的POC。我能够创建一个快乐的路径,在其中我可以从一个主题消费消息,并通过其他Kafka主题将其发送到多个服务。现在我正在尝试处理消费消息期间的异常。
正如你所看到的,我有一个简单的OrderRequestEvent类型的消费者。这个消费者接收一条消息,然后用这个apiService验证它。如果验证失败,它应该抛出一个异常。我的想法是在 Saga 中捕获/处理这个异常,改变状态并将其发送到另一个主题进行可能的重试,而不最终确定 Saga 。
我的消费者来了:

public class OrderManagementSystemConsumer : IConsumer<OrderRequestEvent>
    {
        private readonly ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent;
        private readonly IApiService apiService;

        public OrderManagementSystemConsumer(
            ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent, 
            IApiService apiService)
        {
            this.customerValidationResponseEvent = customerValidationResponseEvent;
            this.apiService = apiService;
        }

        public async Task Consume(ConsumeContext<OrderRequestEvent> context)
        {
            ArgumentNullException.ThrowIfNull(context, nameof(context));

            if (await this.apiService.ValidateIncomingRequestAsync(context.Message))
                throw new ArgumentException("Something wrong just happened");
        }
    }

字符串
我的 Saga 中处理OrderRequestEvent的部分

Initially(
            When(OrderRequestedEvent)
                .Then(context => LogContext.Info?.Log("Initializing saga: {0}", context.Saga.CorrelationId))
                .InitializeSaga()
                .Then(context => LogContext.Info?.Log("Validating Customer: {0}", context.Saga.CorrelationId))
                .SendingToCustomerValidation().LogSaga()
                .TransitionTo(ValidatingCustomer));


我注意到,即使我的异常在使用者中被正确地抛出, Saga 仍在正常地继续。显然,在使用者中抛出的异常对 Saga 没有任何影响。

xtupzzrd

xtupzzrd1#

简单地说,这是正确的。如果你的终端没有某种形式的工作,消费者就不会“连接”到Sagas。我阅读你的代码的方式是,你有一个处理事件的saga(OrderRequestEvent)和一个消费者,而不是获得事件的自己的副本并做自己的事情。这是系统按预期工作。
如果您希望这些连接起来,那么您需要向 Saga 添加一些逻辑。最少的活动部分是构建一个包含与Consumer相同逻辑的Custom Saga Activity。然后,该Activity的成功或失败将遵循现有的 Saga 逻辑。
一般来说,您希望 Saga 活动快速(按照HTTP请求的顺序),所以如果验证需要依赖于第三方服务,我建议让 Saga 将其委托给消费者。这更接近于您在第一个问题中的内容。在这种情况下,您将遵循Saga Requests Docs并让 Saga 发出请求/你的消费者需要处理的响应,然后响应Request/Response Docs。在你现有的模型中,你只是抛出一个异常,这将作为一个错误被发送回请求,所以你会想在你的 Saga 中处理这个问题。
至于你发布的代码,完全有可能你已经在尝试这样做了,但是代码隐藏在像SendingToCustomerValidationLogSaga这样的方法后面。
最后,这都是由MT支持的,并且经常使用。所以如果它不工作,它几乎总是一个配置问题。确保打开日志进行调试,看看发生了什么。:)

相关问题