我的工作流中有一个切片,需要等待,直到我在同一个信号通道上接收到切片中每个元素的信号。我尝试使用以下代码,但它似乎没有等待接收到所有消息。
selector := workflow.NewSelector(ctx)
notificationSignalChan := workflow.GetSignalChannel(ctx, "my-channel")
for i := 0; i < len(container.Items); i++ {
if container.Items[i].Status != status.Pending {
continue
}
var expectedNotification events.Notification
selector.AddReceive(notificationSignalChan, func(c workflow.ReceiveChannel, more bool) {
// So it has to be explicitly consumed here
c.Receive(ctx, &expectedNotification)
idx := slices.IndexFunc(container.Items, func(item *model.Item) bool {
return item.ID == notification.ItemID
})
recordedAt := workflow.Now(ctx)
container.Items[idx].Status = status.Processed
err = workflow.ExecuteActivity(ctx, activities.OnProcessed, container.Items[idx]).Get(ctx, nil)
if err != nil {
panic(err)
}
})
}
for i := 0; i < len(container.Items); i++ {
if container.Items[i].Status != status.Pending {
continue
}
selector.Select(ctx)
}
1条答案
按热度按时间ttp71kqs1#
你可以有一个死循环和一个select语句,这样当你从channel得到一个对象时,只要和list中的项比较,然后从list中删除那个项,检查list中是否有任何项,continue else,只要中断for循环。