我已经在windows工作站上安装了wso2 integration studio版本6.5.0,并使用kafka consumer和producer内置模板创建了一个项目。
api.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
weatherdatatransmitinboundep.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" name="WeatherDataTransmitInboundEP" onError="WeatherDataErrorSeq" sequence="WeatherDataProcessSeq" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="topic.name">weatherdatatopic</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="class">org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
</parameters>
</inboundEndpoint>
weatherdatapublishservice.xml:
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="WeatherDataPublishService" startOnLoad="true" transports="http https" xmlns="http://ws.apache.org/ns/synapse">
<target>
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
<maxPoolSize>50</maxPoolSize>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</target>
</proxy>
现在我可以发送邮件请求到http://localhost:8290/发布天气数据并在Kafka主题中获取。我还可以在wso2中接收Kafka生成的消息。如何从wso2向外部服务发送消息?我想我应该用
<endpoint [name="string"] [key="string"]>
address-endpoint | default-endpoint | wsdl-endpoint | load-balanced-endpoint | fail-over-endpoint
</endpoint>
但我不知道它必须添加到哪里以及如何配置
2条答案
按热度按时间mnowg1ta1#
我只是补充一句
排序并得到我想要的
cx6n0qe32#
您可以使用调用中介器[1]或发送中介器[2]来实现您的用例。在中介器中,可以定义要调用的所需端点。请参考以下配置示例。这里我们使用了一个调用中介来调用外部端点http://run.mocky.io/v3/9cf4b844-57c1-4fa5-a101-881dc36385bd.
在您的用例中,如果您已经在负载工厂中介器之后完成构建所需的负载,那么您可以在负载工厂中介器之后使用调用中介器来调用外部端点。这里,有效负载工厂中介器生成的有效负载将用于调用外部端点。
此外,对于weatherdatatransmitinboundep(入站端点),它将读取发布到kafka的消息,然后将消息发送到入站端点中定义的序列。如果要将weatherdatatransmitinboundep使用的消息发送到外部端点,则必须采用不同的方法。在您的例子中,weatherdataprocessseq是在从kafka读取消息之后调用的。因此,如果您的需求是用kafka发送消息,那么您需要在weatherdataprocessseq中定义调用或发送中介。
如果您想进一步澄清有关呼叫/发送调解人,请参阅博客文章[3]。
[1]-https://docs.wso2.com/display/ei6xx/call+mediator [2]-https://docs.wso2.com/display/ei600/send+mediator [3]-https://www.yenlo.com/blog/wso2torial-to-send-or-not-to-send-that-is-your-choice