有人问过这个问题,但我认为没有人回答。唯一的答案是aggregator如何使用correlationid。但真正的问题是如何在不检查回复中的jobexecutionid的情况下更新作业状态。我没有足够的声誉来评论现有的问题,所以再问一次。
根据javadoc MessageChannelPartitionHandler
它应该是步骤或作业范围。在我们使用的远程分区场景中 RemotePartitioningManagerStepBuilder
生成不允许设置partitionhandler的管理器步骤。假设每个作业将在rabbitmq上使用相同的队列,则当接收到工作节点回复时,消息将被交叉。有没有简单的方法来重现这一点,但我可以看到这种行为使用一些手动步骤如下
启动第一个作业
在worker可以回复之前杀死manager节点
让工作节点完成对所有分区的处理,并在rabbitmq上发送回复
再次启动管理器节点并启动新作业
有某种机制使第二个作业失败,即读写器显式失败
检查2个作业的状态
预期结果:作业1标记为已完成,作业2标记为失败
实际结果:job-1保持在started状态,job-2被标记为completed,尽管其工作步骤被标记为failed
下面是显示如何配置管理器和工作器步骤的示例代码
@Bean
public Step importDataStep(RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory) {
return managerStepBuilderFactory.get()
.<String, String>partitioner("worker", partitioner())
.gridSize(2)
.outputChannel(outgoingRequestsToWorkers)
.inputChannel(incomingRepliesFromWorkers)
.listener(stepExecutionListener)
.build();
}
@Bean
public Step worker(
RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) {
return workerStepBuilderFactory.get("worker")
.listener(stepExecutionListener)
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.<String, String>chunk(10)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter());
}
或者,我可以考虑在不发生消息交叉的情况下使用轮询而不是回复。但是,如果管理器节点在工作节点处理时崩溃,则无法重新启动轮询。如果我使用轮询执行上述步骤
实际结果:作业1仍处于启动状态,作业2按预期标记为失败
在轮询的情况下不会发生此问题,因为每个轮询器都使用准确的jobexecutionid来轮询和更新相应的管理器步骤/作业。
我做错什么了?有没有更好的方法来处理这种情况?
暂无答案!
目前还没有任何答案,快来回答吧!