我试图在spring集成中实现一个定制的入站通道适配器,以使用来自apachekafka的消息。基于spring集成示例,我发现需要创建一个实现messagesource接口的类,并实现receive()方法,该方法将从kafka返回已使用的消息。但是基于kafka中的消费者示例,kafkastream中的消息迭代器由blockingqueue支持。因此,如果队列中没有消息,线程将被阻塞。
那么,实现receive()方法的最佳方法是什么呢?因为这个方法可能会阻塞,直到有东西可以使用?
在更一般的意义上,我们如何为流式消息源实现一个定制的入站通道,该通道将阻塞,直到有什么东西可以使用。。?
1条答案
按热度按时间wvyml7n51#
receive()方法可以阻塞(只要底层操作正确响应中断的线程),从入站通道适配器的Angular 来看,根据底层源的期望,最好使用固定延迟触发器。例如,“长轮询”可以在提供非常小的延迟值时模拟事件驱动的行为。
在jms轮询messagesource实现中也有类似的情况。在这里,底层行为由jmstemplate的receive()方法之一处理。jmstemplate本身允许配置超时值。这意味着,作为一个例子,您可以选择阻塞最多5秒,但在每个阻塞接收调用之间有一个非常短的延迟触发器。或者,可以指定不确定的接收超时。决策最终取决于对底层资源、消息吞吐量等的期望。
另外,我想让你知道,我们正在探索Kafka适配器自己。也许您想在spring集成扩展存储库中对此进行协作?
你好,马克