如何将消息从kafka接收器路由到多个主题

e0uiprwp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(470)

我有一个springxd http处理器模块,它带有一个errorchannel和outputchannel的http出站网关。任何带有HTTP200的消息都会到达outputchannel,其余消息都会到达failurechannel。
现在,http处理器模块通过带有topicx的kafka出站适配器连接到kafka接收器。topicx只接收http200消息进行进一步处理。现在,我们需要将failurechannel中的消息路由到topicy。
如何在Kafka接收器中发送多个Kafka主题的消息。我在消息头中有httpstatuscode。我的项目中使用的kafka版本是0.8.2,java版本是1.7

<!-- http-processor-config -->
<int-http:outbound-gateway
        request-channel="input"
        url-expression="'myUrlLink'"
        http-method="POST"
        expected-response-type="java.lang.String"
        charset="UTF-8"
        reply-timeout="10"
        reply-channel="output">

        <int-http:request-handler-advice-chain>
                    <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                        <property name="recoveryCallback">
                            <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                                <constructor-arg ref="errorChannel" />
                            </bean>
                        </property>
                        <property name="retryTemplate" ref="retryTemplate" />
                    </bean>
        </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>

<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>

关于KafkaFlume,我有以下生产者的背景:

<int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          topic="${topicX}"
                                          key-class-type="java.lang.String"
                                          key-serializer="serializer"
                                          value-class-type="[B"
                                          value-serializer="valueSerializer"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>
bttbmeg0

bttbmeg01#

这是真的,它不被支持,也不会被支持。 Spring xd今年已经下线了。鼓励大家迁移到spring云数据流。
对于您的用例,您可以编辑kafka接收器模块配置。再加一个 <int-kafka:outbound-channel-adapter> 这是另一个主题。要决定将传入消息发送到哪个主题,可以添加 <router> 到此配置。
或者只是考虑使用 Router Sink . 并为每个消息类型和每个主题提供两个独立的流。

jei2mxaa

jei2mxaa2#

我终于成功了。现在我找到了一个0.8.x版本的解决方案,在http处理器模块中添加了一个拆分器,并在消息头中添加了一个kafka\u主题变量。基于http状态码,我只是设置了不同的主题。
在kafka sink上,我添加了另一个producer配置,使用新的topic name变量,通过xd params设置。我想不出任何其他解决方案,因为我在多个流中重用kafka源和kafka接收器模块。
这个特定的kafka接收器将输入发送到另一个xd流。因此,添加了一个头过滤器,以便在下一个流开始时删除kafka源模块中的kafka\u主题。
阅读更多:http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.release/reference/html/_spring_integration.html
寻找设定Kafka主题的台词。这是关键。

相关问题