在flume生命周期中使用spring amqp onmessage()方法

9fkzdhlc  于 2021-06-04  发布在  Flume
关注(0)|答案(1)|浏览(388)

我需要听我开发的flume自定义源中的rabbit队列。这个要求在flume中可能看起来很尴尬,但这正是它所需要的。由于为了简单起见,我使用springamqp来监听队列,所以我无法理解如何在flume lifecycle start()方法中调用onmessage()方法,以便将消息发布到flume频道。我已经研究了springmessagelisteneradapter的概念,但是还没有找到任何实现相同概念的例子。

fquxozlt

fquxozlt1#

onMessage() 是…的一部分 MessageListener 图案。它是由外部系统(从大的高度)引发的某种活性成分。它每次都通过远程命令工作,所以你不能把它当作 passive 将由用户调用启动的组件。
因为你有“flume lifecycle start()”从另一边和 SimpleMessageListenerContainer 从侧面看也是一样的,我想说的是,你必须把它们的生命周期联系起来,才能协同工作。
从这里开始你应该提供 SimpleMessageListenerContainer 一些内联的 MessageListener 实现,调用所需的方法“发布到flume通道”。
hth公司
更新

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
....
container.setMessageListener(new MessageListener() {

   public void onMessage(Message message) {
       sendMessageToFlumeChannel(message);
   }

});

在哪里 sendMessageToFlumeChannel 是保持类的方法。
当然可以是任何pojo而不是 MessageListener 实现,但主要目标是将侦听器结果委托给某个方法。

相关问题