spark流和kafka:从hdfs读取文件的最佳方式

nnt7mjpx  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(289)

脚本
我们希望接收csv文件(大约10 mb),这些文件存储在hdfs中。然后,进程将向kafka主题发送一条消息(消息包含hdfs位置等文件元数据)。
spark流作业侦听这个kafka主题,在接收到消息后,它应该从hdfs读取文件并处理该文件。
在上述场景中,从hdfs读取文件最有效的方法是什么?
读Kafka的话

JavaInputStream<ConsumerRecord<String, FileMetaData>> messages = KafkaUtils.createDirectStream(...);
JavaDStream<FileMetaData> files = messages.map(record -> record.value());

选项1-使用平面图功能

JavaDStream<String> allRecords = files.flatMap(file -> {
    ArrayList<String> records = new ArrayList<>();
    Path inFile = new Path(file.getHDFSLocation());
    // code to read file from HDFS
    return records;
});

选项2-使用ACHRDD

ArrayList<String> records = new ArrayList<>();
files.foreachRDD(rdd -> {
    rdd.foreachPartition(part -> {
        while(part.hasNext()) {
            Path inFile = new Path(part.next().getHDFSLocation());
            // code to read file from HDFS
            records.add(...);
        }
    }
}

JavaRDD<String> rddRecords = javaSparkContext.sparkContext().parallize(records);

哪种选择更好?另外,我应该使用spark上下文的内置方法从hdfs读取文件,而不是使用hdfs路径吗?
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题