spring-xd如何使用source:file read 一行接一行

1sbrub3j  于 2021-06-04  发布在  Hadoop
关注(0)|答案(4)|浏览(368)

我有一个流,它监视目录中多个文件的输出,处理数据并将其放入hdfs。这是我的stream create命令:

stream create --name fileHdfs --definition "file --dir=/var/log/supervisor/ --pattern=tracker.out-*.log --outputType=text/plain | logHdfsTransformer | hdfs --fsUri=hdfs://192.168.1.115:8020 --directory=/data/log/appsync --fileName=log --partitionPath=path(dateFormat('yyyy/MM/dd'))" --deploy

问题是source:file module 将从文件读取的所有数据发送到日志处理模块,而不是每轮发送一行,因为有效负载字符串有数百万个字符,我无法处理它。前任:

--- PAYLOAD LENGTH---- 9511284

请告诉我使用时如何逐行阅读source:file module,谢谢!!!

fivyi3re

fivyi3re1#

我知道这可能已经晚了,但对于那些正在寻找解决方案的谷歌用户来说:
即使没有模块或选项可以自动执行,也只需添加一个拆分器,将传入消息拆分为多个传出消息。
请注意,您必须在使用\n和\r\n之间做出决定。检查你的文件,看看他们在用什么。
例子:

stream create --name filetest --definition "file --outputType=text/plain --dir=/tmp/examplefiles/| splitter --expression=payload.split('\\n') | log" --deploy

干杯!

3bygqnnd

3bygqnnd2#

springintegration有一个filespliter,可以将文本文件拆分成行。您可以使用它来创建自定义处理器模块,我们称之为文件拆分:
创建具有以下内容的file-split.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xsi:schemaLocation="http://www.springframework.org/schema/beans        
   http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/integration 
   http://www.springframework.org/schema/integration/spring-integration.xsd">

<int:splitter input-channel="input" output-channel="output">
        <bean class="org.springframework.integration.file.splitter.FileSplitter">
                <constructor-arg value="false"/>
        </bean>
</int:splitter>

<int:channel id="output"/> 
</beans>

将文件复制到${xd\u home}/xd/modules/processor/file split/config/(如果需要,创建路径)
示例用法:

stream create --name splitFile --definition "file --dir=/data --ref=true | file-split | log" --deploy

如果需要,您可以进一步自定义该模块以获取更多选项。

jobtbby3

jobtbby33#

它目前不受支持,但编写自定义 source 使用spring集成 inbound-channel-adapter 调用一次读取一行的pojo。
请打开一个新的功能jira问题。
你也可以用一个 job 而不是 stream 在xd中。

wwtsj6pe

wwtsj6pe4#

您可以尝试在部署流时使用--mode=lines选项。请检查以下文件参考:http://docs.spring.io/spring-xd/docs/current/reference/html/#file
希望这有帮助!
干杯,普拉蒂克

相关问题