spring集成,为两个不同的进程使用一个通道

s71maibg  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(430)

我有一个问题,我不能设置进程去两个通道并行,它要么去一个或另一个。我有一个 ServiceActivator 还有一个 Transformer 具有相同的输入通道 objectOutputChannel . 去了医院之后 processChannel 它应该同时指向两个请求,但是它只指向其中一个请求,并且每个请求都不同。我该怎么做才能让它按我想要的方式工作?

@Bean
    public TcpReceivingChannelAdapter channelAdapter(AbstractServerConnectionFactory connectionFactory) {
        final MsgReceivingChannelAdapter adapter = new MsgReceivingChannelAdapter();
        adapter.setConnectionFactory(connectionFactory);
        adapter.setOutputChannel(messageChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "outputChannel")
    public TcpSendingMessageHandler messageHandler(AbstractServerConnectionFactory connectionFactory){
        final MsgSendingMessageHandler handler = new MsgSendingMessageHandler();
        handler.setConnectionFactory(connectionFactory);
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "objectOutputChannel")
    public AmqpOutboundEndpoint objectOutboundEndpoint() {
        return Amqp
                .outboundAdapter(rabbitTemplate)
                .exchangeName(objectExchange)
                .get();
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "loggingChannel")
    public ObjectToStringTransformer loggingTransformer() {
        return new ObjectToStringTransformer();
    }

    @ServiceActivator(inputChannel = "loggingChannel")
    public void loggingService(String message) {
        messageLogger.info(message);
    }

    @Bean
    @Transformer(inputChannel = "messageChannel", outputChannel = "processChannel")
    public ObjectDeserializer objectDeserializer() {
        return new ObjectDeserializer();
    }

    @ServiceActivator(inputChannel = "processChannel", outputChannel = "objectOutputChannel")
    public MyObject processService(MyObject object) {
        return objectService.check(object);
    }

    @Bean
    @Transformer(inputChannel = "objectOutputChannel", outputChannel = "outputChannel")
    public ObjectSerializer objectSerializer() {
        return new ObjectSerializer();
    }

    @Bean(name = "loggingChannel")
    public MessageChannel loggingChannel() {
        return new DirectChannel();
    }

    @Bean(name = "outputChannel")
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean(name = "messageChannel")
    public MessageChannel messageChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean(name = "objectOutputChannel")
    public MessageChannel objectOutputChannel() {
        return new PublishSubscribeChannel());

案例a的调试日志-不进入 objectOutboundEndpoint() 然后进入 objectSerializer() ```
2021-02-09 11:23:40.221 DEBUG 8964 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection : Message received GenericMessage [payload=...]
2021-02-09 11:23:40.223 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.225 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:23:40.231 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.233 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:40.258 DEBUG 8964 --- [pool-4-thread-2] o.s.i.handler.ServiceActivatingHandler : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=..]
2021-02-09 11:23:42.383 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.385 DEBUG 8964 --- [pool-4-thread-2] o.s.i.t.MessageTransformingHandler : bean 'objectTcpController.objectSerializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : preSend on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : bean 'objectTcpController.messageHandler.serviceActivator.handler' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.388 DEBUG 8964 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler : bean 'messageHandler'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@38499e48' received message: GenericMessage [payload=...]
2021-02-09 11:23:42.389 DEBUG 8964 --- [pool-4-thread-2] .i.h.ReplyProducingMessageHandlerWrapper : handler 'bean 'objectTcpController.messageHandler.serviceActivator.handler'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'outputChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@66434cc8'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.390 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.integration.channel.DirectChannel : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:23:42.391 DEBUG 8964 --- [pool-4-thread-2] o.s.i.channel.PublishSubscribeChannel : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]

案例b的调试日志-不进入 `objectSerializer()` 然后进入 `objectOutboundEndpoint()` ```
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.i.tcp.connection.TcpNetConnection  : Message received GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.173 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.loggingTransformer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@5bf95bee] (objectTcpController.loggingService.serviceActivator.handler)' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'loggingChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@2b87581'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.174 DEBUG 8964 --- [pool-4-thread-3] o.s.i.t.MessageTransformingHandler       : bean 'objectTcpController.objectDeserializer.transformer.handler' received message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'processChannel'', message: GenericMessage [payload=...]
2021-02-09 11:28:19.175 DEBUG 8964 --- [pool-4-thread-3] o.s.i.handler.ServiceActivatingHandler   : ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@44e043f] (objectTcpController.processService.serviceActivator.handler) received message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : preSend on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.572 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489' received message: GenericMessage [payload=...]
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_tcp_remotePort] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_connectionId] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.574 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_localInetAddress] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_address] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[history] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.575 DEBUG 8964 --- [pool-4-thread-3] s.i.m.AbstractHeaderMapper$HeaderMatcher : headerName=[ip_hostname] WILL be mapped, matched pattern=*
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.i.a.outbound.AmqpOutboundEndpoint    : handler 'bean 'objectOutboundEndpoint'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@1a5b8489'' produced no reply for request Message: GenericMessage [payload=...]
2021-02-09 11:28:22.635 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'processChannel'', message: GenericMessage [payload=..]
2021-02-09 11:28:22.639 DEBUG 8964 --- [pool-4-thread-3] o.s.i.channel.PublishSubscribeChannel    : postSend (sent=true) on channel 'bean 'messageChannel'; defined in: 'class path resource [com/company/demo/core/controllers/ObjectTcpController.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@42f22995'', message: GenericMessage [payload=...]
7vhp5slm

7vhp5slm1#

你所描述的不是关于 objectOutputChannel 或者 messageChannel 因为他们都是 PublishSubscribeChannel 类示例。你所描述的是一些 DirectChannel 它的循环默认调度策略。但既然我们没有几个订户直接收看你给我们看的频道,那么问题就出在别的地方了。
我建议您打开消息历史记录并调查的调试日志 org.springframework.integration 类别以查看消息如何在流中传播。在那里你可以找出哪个渠道是有罪的。
您在代码片段中显示的内容是正确的,看起来与您描述的内容不相关。。。
更新
我们在您的日志中看到:

o.s.integration.channel.DirectChannel    : postSend (sent=true) on channel 'bean 'objectOutputChannel'; defined in: 'class path resource [com/company/demo/core/integration/spring/ObjectIntegrationConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@6f8f8a80'', message: GenericMessage [payload=...]

所以,你的 objectOutputChannel 真的是一个 DirectChannel 示例及其bean在 ObjectIntegrationConfiguration . 如果该配置不是您在问题中显示的配置,那么您就有一个“bean定义覆盖”竞争条件和 PublishSubscribeChannel 被它覆盖了 DirectChannel . 请修改您的项目配置,并尝试找到您声明的其他位置 objectOutputChannel 豆子。

相关问题