SpringXDRabbit源模块无法处理消息,第一条消息保持未确认状态

j8yoct9x  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(542)

我正在尝试使用简单的springxd应用程序在hdfs中加载日志事件。我已经用 spring-ampq/rabbit log4j appender (the) org.springframework.amqp.rabbit.log4j.AmqpAppender 类)将日志消息传输到预配置的exchange。我将下面的流设置为从中提取这些消息并将它们推送到hdfs,其中soruce和sink模块都是现成的xd模块,
流定义,

xd:>stream create --name demoQ1 --definition "rabbit | hdfs --rollover=15 --directory=/user/root" --deploy

创建并部署了新的流“demoq1”

xd:>stream list
  Stream Name  Stream Definition                                   Status
  -----------  --------------------------------------------------  --------
  demoQ1       rabbit | hdfs --rollover=15 --directory=/user/root  deployed

amqp appender正在将消息发布到exchange,并将其路由到demoq1队列,rabbit source在该队列中拾取第一条消息,然后由于不确认该消息而卡住。原因是什么?

qpgpyjmq

qpgpyjmq1#

在运行rabbit模块的spring xd容器上启用调试日志,它显示第一条消息重复发生以下异常,并且消息被重新查询回来,因此消息保持未确认状态,rabbit源无法处理进一步的消息。。
为了解决这个问题,我从log4j appender属性中删除了这个属性, log4j.appender.amqp.contentEncoding=null . 这个属性显式地将编码器的名称指定为“null”,这似乎是一个bug。我希望null表示没有指定编码器:)
日志中出现异常,消息被拒绝并重新排队返回时连续重复。。

19:29:17,713 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:268 - Received message: (Body:'Hello'MessageProperties [headers={categoryName=org.apache.hadoop.yarn.server.nodemanager.NodeManager, level=INFO}, timestamp=Sat Apr 19 19:21:52 PDT 2014, messageId=null, userId=null, appId=NodeManager, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=test-exch, receivedRoutingKey=rk1, deliveryTag=184015, messageCount=0]) 19:29:17,715 WARN SimpleAsyncTaskExecutor-1 listener.SimpleMessageListenerContainer:530 - Execution of Rabbit message listener failed, and no ErrorHandler has been set. org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:751) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:690) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:583) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:556) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) at java.lang.Thread.run(Thread.java:722) Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert text-based Message content at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:100) at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$1.onMessage(AmqpInboundChannelAdapter.java:73) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:688) ... 10 more Caused by: java.io.UnsupportedEncodingException: null at java.lang.StringCoding.decode(StringCoding.java:190) at java.lang.String.(String.java:416) at java.lang.String.(String.java:481) at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:97) ... 12 more 19:29:17,715 DEBUG SimpleAsyncTaskExecutor-1 listener.BlockingQueueConsumer:657 - Rejecting messages (requeue=true)
ckx4rj1h

ckx4rj1h2#

在您的容器日志中,您是否看到这样的信息:“未能将消息负载写入hdfs”?
如果是这样,则需要使用模块之间的类型转换。从rabbit源到hdfs接收器,消息将只是字节数组。
你的流定义可以是,
stream create--name demoq1--definition“兔子--outputtype=text/plain | hdfs--rollover=15--directory=/user/root”--deploy
或者,
stream create--name demoq1--定义“rabbit | hdfs--inputtype=text/plain--rollover=15--directory=/user/root”--部署
分别注意source/sink中的outputtype或inputtype选项。在本例中,hdfs接收器的hdfsstoremessagehandler期望有效负载为string类型。
有关类型转换的详细信息,请查看以下内容:https://github.com/spring-projects/spring-xd/wiki/type-conversion

相关问题