我有一个流,它监视目录中多个文件的输出,处理数据并将其放入hdfs。这是我的stream create命令:
stream create --name fileHdfs --definition "file --dir=/var/log/supervisor/ --pattern=tracker.out-*.log --outputType=text/plain | logHdfsTransformer | hdfs --fsUri=hdfs://192.168.1.115:8020 --directory=/data/log/appsync --fileName=log --partitionPath=path(dateFormat('yyyy/MM/dd'))" --deploy
问题是source:file module 将从文件读取的所有数据发送到日志处理模块,而不是每轮发送一行,因为有效负载字符串有数百万个字符,我无法处理它。前任:
--- PAYLOAD LENGTH---- 9511284
请告诉我使用时如何逐行阅读source:file module,谢谢!!!
4条答案
按热度按时间fivyi3re1#
我知道这可能已经晚了,但对于那些正在寻找解决方案的谷歌用户来说:
即使没有模块或选项可以自动执行,也只需添加一个拆分器,将传入消息拆分为多个传出消息。
请注意,您必须在使用\n和\r\n之间做出决定。检查你的文件,看看他们在用什么。
例子:
干杯!
3bygqnnd2#
springintegration有一个filespliter,可以将文本文件拆分成行。您可以使用它来创建自定义处理器模块,我们称之为文件拆分:
创建具有以下内容的file-split.xml文件:
将文件复制到${xd\u home}/xd/modules/processor/file split/config/(如果需要,创建路径)
示例用法:
如果需要,您可以进一步自定义该模块以获取更多选项。
jobtbby33#
它目前不受支持,但编写自定义
source
使用spring集成inbound-channel-adapter
调用一次读取一行的pojo。请打开一个新的功能jira问题。
你也可以用一个
job
而不是stream
在xd中。wwtsj6pe4#
您可以尝试在部署流时使用--mode=lines选项。请检查以下文件参考:http://docs.spring.io/spring-xd/docs/current/reference/html/#file
希望这有帮助!
干杯,普拉蒂克