我们尝试将kafka与wso2 esb服务器结合使用。
我们已经实现了将传入消息放入kafka的api。
然后我们实现了一个入站端点,它从kafka检索消息并将这些消息传输到其他外部系统。
一切都很顺利,但当我们测试“外部系统关闭”场景时,失败的消息不会在外部系统再次启动时传递。
我们如何向外部系统发送失败的消息?
api配置:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/api/event" name="EventAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<log category="DEBUG" description="" level="full"/>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<maxPoolSize>20</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>event_topic</topic>
</kafkaTransport.publishMessages>
<loopback/>
</inSequence>
<outSequence>
<payloadFactory media-type="json">
<format>{"result" : "OK"}</format>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<send/>
</outSequence>
</resource>
</api>
入站配置:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="EventTransmitter" protocol="kafka"
sequence="transmit_sequence" suspend="false" onError="fault"
xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/json</parameter>
<parameter name="topics">event_topic</parameter>
<parameter name="group.id">myconsumer</parameter>
<parameter name="consumer.id">myconsumer</parameter>
<parameter name="dual.commit.enabled">true</parameter>
<parameter name="auto.offset.reset">largest</parameter>
</parameters>
</inboundEndpoint>
顺序:
<?xml version="1.0" encoding="UTF-8"?>
<sequence name="transmit_sequence" onError="fault" trace="disable" xmlns="http://ws.apache.org/ns/synapse">
<send receive="event_transmit_out_sequence">
<endpoint key="gov:endpoints/HandlerEndpoint.xml"/>
</send>
</sequence>
暂无答案!
目前还没有任何答案,快来回答吧!