我正在尝试使用简单的springxd应用程序在hdfs中加载日志事件。我已经用 spring-ampq/rabbit log4j appender
(the) org.springframework.amqp.rabbit.log4j.AmqpAppender
类)将日志消息传输到预配置的exchange。我将下面的流设置为从中提取这些消息并将它们推送到hdfs,其中soruce和sink模块都是现成的xd模块,
流定义,
xd:>stream create --name demoQ1 --definition "rabbit | hdfs --rollover=15 --directory=/user/root" --deploy
创建并部署了新的流“demoq1”
xd:>stream list
Stream Name Stream Definition Status
----------- -------------------------------------------------- --------
demoQ1 rabbit | hdfs --rollover=15 --directory=/user/root deployed
amqp appender正在将消息发布到exchange,并将其路由到demoq1队列,rabbit source在该队列中拾取第一条消息,然后由于不确认该消息而卡住。原因是什么?
2条答案
按热度按时间qpgpyjmq1#
在运行rabbit模块的spring xd容器上启用调试日志,它显示第一条消息重复发生以下异常,并且消息被重新查询回来,因此消息保持未确认状态,rabbit源无法处理进一步的消息。。
为了解决这个问题,我从log4j appender属性中删除了这个属性,
log4j.appender.amqp.contentEncoding=null
. 这个属性显式地将编码器的名称指定为“null”,这似乎是一个bug。我希望null表示没有指定编码器:)日志中出现异常,消息被拒绝并重新排队返回时连续重复。。
ckx4rj1h2#
在您的容器日志中,您是否看到这样的信息:“未能将消息负载写入hdfs”?
如果是这样,则需要使用模块之间的类型转换。从rabbit源到hdfs接收器,消息将只是字节数组。
你的流定义可以是,
stream create--name demoq1--definition“兔子--outputtype=text/plain | hdfs--rollover=15--directory=/user/root”--deploy
或者,
stream create--name demoq1--定义“rabbit | hdfs--inputtype=text/plain--rollover=15--directory=/user/root”--部署
分别注意source/sink中的outputtype或inputtype选项。在本例中,hdfs接收器的hdfsstoremessagehandler期望有效负载为string类型。
有关类型转换的详细信息,请查看以下内容:https://github.com/spring-projects/spring-xd/wiki/type-conversion