我正在开发一个使用Kafka并应用 Saga 模式的POC。我能够创建一个快乐的路径,在其中我可以从一个主题消费消息,并通过其他Kafka主题将其发送到多个服务。现在我正在尝试处理消费消息期间的异常。
正如你所看到的,我有一个简单的OrderRequestEvent
类型的消费者。这个消费者接收一条消息,然后用这个apiService
验证它。如果验证失败,它应该抛出一个异常。我的想法是在 Saga 中捕获/处理这个异常,改变状态并将其发送到另一个主题进行可能的重试,而不最终确定 Saga 。
我的消费者来了:
public class OrderManagementSystemConsumer : IConsumer<OrderRequestEvent>
{
private readonly ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent;
private readonly IApiService apiService;
public OrderManagementSystemConsumer(
ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent,
IApiService apiService)
{
this.customerValidationResponseEvent = customerValidationResponseEvent;
this.apiService = apiService;
}
public async Task Consume(ConsumeContext<OrderRequestEvent> context)
{
ArgumentNullException.ThrowIfNull(context, nameof(context));
if (await this.apiService.ValidateIncomingRequestAsync(context.Message))
throw new ArgumentException("Something wrong just happened");
}
}
字符串
我的 Saga 中处理OrderRequestEvent
的部分
Initially(
When(OrderRequestedEvent)
.Then(context => LogContext.Info?.Log("Initializing saga: {0}", context.Saga.CorrelationId))
.InitializeSaga()
.Then(context => LogContext.Info?.Log("Validating Customer: {0}", context.Saga.CorrelationId))
.SendingToCustomerValidation().LogSaga()
.TransitionTo(ValidatingCustomer));
型
我注意到,即使我的异常在使用者中被正确地抛出, Saga 仍在正常地继续。显然,在使用者中抛出的异常对 Saga 没有任何影响。
1条答案
按热度按时间xtupzzrd1#
简单地说,这是正确的。如果你的终端没有某种形式的工作,消费者就不会“连接”到Sagas。我阅读你的代码的方式是,你有一个处理事件的saga(OrderRequestEvent)和一个消费者,而不是获得事件的自己的副本并做自己的事情。这是系统按预期工作。
如果您希望这些连接起来,那么您需要向 Saga 添加一些逻辑。最少的活动部分是构建一个包含与Consumer相同逻辑的Custom Saga Activity。然后,该Activity的成功或失败将遵循现有的 Saga 逻辑。
一般来说,您希望 Saga 活动快速(按照HTTP请求的顺序),所以如果验证需要依赖于第三方服务,我建议让 Saga 将其委托给消费者。这更接近于您在第一个问题中的内容。在这种情况下,您将遵循Saga Requests Docs并让 Saga 发出请求/你的消费者需要处理的响应,然后响应Request/Response Docs。在你现有的模型中,你只是抛出一个异常,这将作为一个错误被发送回请求,所以你会想在你的 Saga 中处理这个问题。
至于你发布的代码,完全有可能你已经在尝试这样做了,但是代码隐藏在像
SendingToCustomerValidation
和LogSaga
这样的方法后面。最后,这都是由MT支持的,并且经常使用。所以如果它不工作,它几乎总是一个配置问题。确保打开日志进行调试,看看发生了什么。:)