publishersubscribechannel中的spring集成处理异常

oprakyz7  于 2021-07-22  发布在  Java
关注(0)|答案(1)|浏览(391)

我对spring集成还很陌生。我正在评估我们项目的spring集成。我遇到了一个关于如何处理异常的问题。
我正在使用PublishSubscribeBchannel处理消息。我不确定这是不是一个正确的方法。当publishsubscribechannel中抛出异常时,我希望它路由到不同的通道,以便我可以回复不同的http状态代码。
如何将PublishSubscribeBchannel中的异常路由到errorchannel。
我有以下代码。我试过使用 routeException 在另一个领域的代码,但没有运气。有人能帮我解决这个问题吗?

@Configuration
@EnableIntegration
public class IntegrationConfiguration {

    // curl http://localhost:8080/tasks --data '{"username":"xyz","password":"xyz"}' -H 'Content-type: application/json'
    @Bean
    MessageChannel directChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow httpGateway() {
        return IntegrationFlows.from(
            Http.inboundGateway("/tasks")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(String.class)
                .requestChannel(directChannel())
            .get()
        )
            .transform(t -> {
                return "transofrm " + t;
            })
            .channel("queueChannel")
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .get();
    }

    @Bean
    public IntegrationFlow handleMessage() {
        return IntegrationFlows.from("queueChannel")
            .wireTap(flow -> flow.handle(System.out::println))
            .routeByException(r -> {
                r.channelMapping(RuntimeException.class, "errorChannel");
                r.defaultOutputToParentFlow();
            })
            .publishSubscribeChannel(publisher -> {
                publisher.errorHandler(var1 -> {
                    var1.printStackTrace();
                })
                    .subscribe(flow -> flow
                        .handle(m -> {
                            if (m.getPayload().toString().contains("user")) {
                                throw new RuntimeException("user found");
                            }
                            System.out.println("subscribed " + m.getPayload());
                        })
                    );
                }
            )

            .transform(t -> "")
            .wireTap(flow -> flow.handle(m -> {
                System.out.println(m.getHeaders().get("status"));
            }))
            .enrichHeaders( c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.OK))
            .get();
    }

    @Bean
    IntegrationFlow exceptionOrErrorFlow() {
        return IntegrationFlows.from("errorChannel")
            .wireTap(f -> f.handle(m -> System.out.println("failed badly")))
            .enrichHeaders(c -> c.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_REQUEST))
            .get();
    }
}
puruo6ea

puruo6ea1#

你想做什么还不完全清楚。通常不应该在这样的流中使用队列通道。
只需添加一个 .errorChannel 到入站网关,任何下游异常都将发送到该通道,您可以向该通道添加一个订户来处理异常。
另外,你不应该打电话 get() 在内部声明的规范(不是bean的规范)上,使用 .from() 直接接受…规格。否则bean将无法正确初始化。

return IntegrationFlows.from(
        Http.inboundGateway("/tasks")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class)
            .requestChannel(directChannel())
            .errorChannel("myErrors")
    )
    ...

相关问题