所以,我一定是漏掉了什么,在我看来,即使事件没有发生,我的状态机也会阻止执行。这是我的状态机:
public class OrderStateMachine : MassTransitStateMachine<NormalScanState>
{
public OrderStateMachine()
{
Event(() => OrderScannedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => OrderDataRetrieved,
x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => SortingGeneratedEvent,
x => x.CorrelateById(context => new Guid(context.GetHeader("correlation-id")!)));
InstanceState(x => x.CurrentState);
Initially(
When(OrderScannedEvent)
.Then(context =>
{
context.Saga.CorrelationId = context.Message.CorrelationId;
context.Saga.OrderId = context.Message.OrderId;
context.Saga.OrderData.TrackingNumber = context.Message.TrackingNumber;
})
.Activity(x => x.OfType<GetOrderActivity>())
.TransitionTo(GetOrderSubmitted));
During(GetOrderSubmitted,
Ignore(OrderScannedEvent),
Ignore(SortingGeneratedEvent),
When(OrderDataRetrieved)
.Publish(context => new GenerateSorting
{
Height = context.Saga.OrderData.Height,
Length = context.Saga.OrderData.Length,
}
, AddHeaders)
.TransitionTo(GenerateSortingSubmitted));
During(GenerateSortingSubmitted,
Ignore(OrderScannedEvent),
Ignore(OrderDataRetrieved),
When(SortingGeneratedEvent)
.Then(context =>
{
context.Saga.OrderData.SortCode = context.Message.SortCode;
context.Saga.OrderData.RouteCode = context.Message.RouteCode;
context.Saga.OrderData.Carrier = context.Message.Carrier;
})
.TransitionTo(Accepted));
}
public State GetOrderSubmitted { get; private set; }
public State GenerateSortingSubmitted { get; private set; }
public State Accepted { get; private set; }
public Event<OrderSubmitted>? OrderSubmittedEvent { get; private set; }
public Event<OrderDataRetrieved> OrderDataRetrieved { get; private set; }
public Event<SortingGenerated> SortingGeneratedEvent { get; private set; }
private void AddHeaders(PublishContext publishContext)
{
var consumeContext = publishContext.GetPayload<ConsumeContext>();
var correlationId = consumeContext.CorrelationId;
publishContext.Headers.Set("correlation-id", correlationId);
}
}
字符串
即使OrderDataRetrieved和GenerateSortingSubmitted事件没有发生,During(GetOrderSubmitted)和During(GenerateSortingSubmitted)中的代码也会被执行。
这是我的安装程序:
public void InstallServices(IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(cfg =>
{
var entryAssembly = Assembly.GetEntryAssembly();
cfg.SetKebabCaseEndpointNameFormatter();
cfg.SetInMemorySagaRepositoryProvider();
cfg.AddConsumers(entryAssembly);
cfg.AddSagaStateMachines(entryAssembly);
cfg.AddSagas(entryAssembly);
cfg.AddActivities(entryAssembly);
cfg.UsingRabbitMq((context, configurator) =>
{
configurator.Host(queueConfig.ConnectionString);
configurator.DeployPublishTopology = true;
configurator.ClearSerialization();
configurator.UseRawJsonSerializer(RawSerializerOptions.AnyMessageType);
configurator.ConfigureEndpoints(context);
});
});
}
的
我错过了什么?
1条答案
按热度按时间niknxzdl1#
第一个月
这不适用于使用多个事件类型的 Saga ,因为它将把每个入站消息都视为与状态机中的每个事件匹配的消息。
文档介绍了原始JSON的工作原理。