wso2 kafka入站终结点出错时重试

hvvq6cgz  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(240)

我们尝试将kafka与wso2 esb服务器结合使用。
我们已经实现了将传入消息放入kafka的api。
然后我们实现了一个入站端点,它从kafka检索消息并将这些消息传输到其他外部系统。
一切都很顺利,但当我们测试“外部系统关闭”场景时,失败的消息不会在外部系统再次启动时传递。
我们如何向外部系统发送失败的消息?
api配置:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/api/event" name="EventAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <log category="DEBUG" description="" level="full"/>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
                <valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
                <maxPoolSize>20</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>event_topic</topic>
            </kafkaTransport.publishMessages>
            <loopback/>
        </inSequence>
        <outSequence>
            <payloadFactory media-type="json">
                <format>{"result" : "OK"}</format>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <send/>
        </outSequence>            
    </resource>
</api>

入站配置:

<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="EventTransmitter" protocol="kafka"
    sequence="transmit_sequence" suspend="false" onError="fault"
    xmlns="http://ws.apache.org/ns/synapse">
    <parameters>

        <parameter name="interval">10</parameter>
        <parameter name="coordination">true</parameter>
        <parameter name="sequential">true</parameter>
        <parameter name="zookeeper.connect">localhost:2181</parameter>
        <parameter name="consumer.type">highlevel</parameter>
        <parameter name="content.type">application/json</parameter>
        <parameter name="topics">event_topic</parameter>
        <parameter name="group.id">myconsumer</parameter>
        <parameter name="consumer.id">myconsumer</parameter>

        <parameter name="dual.commit.enabled">true</parameter>
        <parameter name="auto.offset.reset">largest</parameter>

    </parameters>
</inboundEndpoint>

顺序:

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="transmit_sequence" onError="fault" trace="disable" xmlns="http://ws.apache.org/ns/synapse">

    <send receive="event_transmit_out_sequence">
        <endpoint key="gov:endpoints/HandlerEndpoint.xml"/>
    </send>

</sequence>

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题