我对spring集成还很陌生。我正在评估我们项目的spring集成。我遇到了一个关于如何处理异常的问题。
我正在使用PublishSubscribeBchannel处理消息。我不确定这是不是一个正确的方法。当publishsubscribechannel中抛出异常时,我希望它路由到不同的通道,以便我可以回复不同的http状态代码。
如何将PublishSubscribeBchannel中的异常路由到errorchannel。
我有以下代码。我试过使用 routeException
在另一个领域的代码,但没有运气。有人能帮我解决这个问题吗?
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
// curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'
@Bean
MessageChannel directChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow httpGateway() {
return IntegrationFlows.from(
Http.inboundGateway("/tasks")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class)
.requestChannel(directChannel())
.get()
)
.transform(t -> {
return "transofrm " + t;
})
.channel("queueChannel")
.routeByException(r -> {
r.channelMapping(RuntimeException.class, "errorChannel");
r.defaultOutputToParentFlow();
})
.get();
}
@Bean
public IntegrationFlow handleMessage() {
return IntegrationFlows.from("queueChannel")
.wireTap(flow -> flow.handle(System.out::println))
.routeByException(r -> {
r.channelMapping(RuntimeException.class, "errorChannel");
r.defaultOutputToParentFlow();
})
.publishSubscribeChannel(publisher -> {
publisher.errorHandler(var1 -> {
var1.printStackTrace();
})
.subscribe(flow -> flow
.handle(m -> {
if (m.getPayload().toString().contains("user")) {
throw new RuntimeException("user found");
}
System.out.println("subscribed " + m.getPayload());
})
);
}
)
.transform(t -> "")
.wireTap(flow -> flow.handle(m -> {
System.out.println(m.getHeaders().get("status"));
}))
.enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))
.get();
}
@Bean
IntegrationFlow exceptionOrErrorFlow() {
return IntegrationFlows.from("errorChannel")
.wireTap(f -> f.handle(m -> System.out.println("failed badly")))
.enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))
.get();
}
}
1条答案
按热度按时间puruo6ea1#
你想做什么还不完全清楚。通常不应该在这样的流中使用队列通道。
只需添加一个
.errorChannel
到入站网关,任何下游异常都将发送到该通道,您可以向该通道添加一个订户来处理异常。另外,你不应该打电话
get()
在内部声明的规范(不是bean的规范)上,使用.from()
直接接受…规格。否则bean将无法正确初始化。