如何使用springxd批处理作业从kafka摄取数据到hdfs?我想有一个批作业,这是计划运行一天一次。我怎样才能追踪Kafka的偏移量?
roqulrg31#
我假设流设置 kafka | hdfs 无法帮助您将此作为批处理作业运行,以便您可以作为批处理作业进行编排。在本例中,可以运行kafka->hdfs的现成xd批处理作业模块还不可用。可以实现自定义批处理作业模块。为了阅读Kafka的信息,你需要一个 ItemReader 从kafka代理读取kafka消息的实现。请参阅amqpitemreader中的类似方法:https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/amqp/amqpitemreader.java查看spring集成kafka将有助于kafka的具体实现:https://github.com/spring-projects/spring-integration-kafka为了将数据写入hdfs,xd已经有了 org.springframework.xd.batch.item.hadoop.HdfsTextItemWriter .任何写入hdfs的现有xd批处理作业模块都将帮助您实现这一点。请随时打开jira,欢迎您的贡献。
kafka | hdfs
ItemReader
org.springframework.xd.batch.item.hadoop.HdfsTextItemWriter
1条答案
按热度按时间roqulrg31#
我假设流设置
kafka | hdfs
无法帮助您将此作为批处理作业运行,以便您可以作为批处理作业进行编排。在本例中,可以运行kafka->hdfs的现成xd批处理作业模块还不可用。可以实现自定义批处理作业模块。
为了阅读Kafka的信息,你需要一个
ItemReader
从kafka代理读取kafka消息的实现。请参阅amqpitemreader中的类似方法:https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/amqp/amqpitemreader.java
查看spring集成kafka将有助于kafka的具体实现:https://github.com/spring-projects/spring-integration-kafka
为了将数据写入hdfs,xd已经有了
org.springframework.xd.batch.item.hadoop.HdfsTextItemWriter
.任何写入hdfs的现有xd批处理作业模块都将帮助您实现这一点。请随时打开jira,欢迎您的贡献。