对于kafka出站通道适配器,我有以下xml配置:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true"
channel="activityOutputChannel">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
这很管用。我正试图在javadsl中复制这一点,但我不能做得太远。到目前为止,我只有这个:
.handle(Kafka.outboundChannelAdapter(kafkaConfig)
.addProducer(producerMetadata, brokerAddress)
.get());
我不知道如何添加 taskExecutor
以及 poller
使用dsl。
关于如何将这些整合到我的整体计划中有什么见解吗 IntegrationFlow
非常感谢。
1条答案
按热度按时间pxyaymoc1#
spring集成组件(例如。
<int-kafka:outbound-channel-adapter>
)由两个豆子组成:AbstractEndpoint
接受来自input-channel
以及MessageHandler
处理消息。所以,
Kafka.outboundChannelAdapter()
是关于MessageHandler
. 任何其他特定于端点的属性都由第二个Consumer<GenericEndpointSpec<H>> endpointConfigurer
的论点.handle()
eip方法:有关更多信息,请参阅参考手册。