脚本
我们希望接收csv文件(大约10 mb),这些文件存储在hdfs中。然后,进程将向kafka主题发送一条消息(消息包含hdfs位置等文件元数据)。
spark流作业侦听这个kafka主题,在接收到消息后,它应该从hdfs读取文件并处理该文件。
在上述场景中,从hdfs读取文件最有效的方法是什么?
读Kafka的话
JavaInputStream<ConsumerRecord<String, FileMetaData>> messages = KafkaUtils.createDirectStream(...);
JavaDStream<FileMetaData> files = messages.map(record -> record.value());
选项1-使用平面图功能
JavaDStream<String> allRecords = files.flatMap(file -> {
ArrayList<String> records = new ArrayList<>();
Path inFile = new Path(file.getHDFSLocation());
// code to read file from HDFS
return records;
});
选项2-使用ACHRDD
ArrayList<String> records = new ArrayList<>();
files.foreachRDD(rdd -> {
rdd.foreachPartition(part -> {
while(part.hasNext()) {
Path inFile = new Path(part.next().getHDFSLocation());
// code to read file from HDFS
records.add(...);
}
}
}
JavaRDD<String> rddRecords = javaSparkContext.sparkContext().parallize(records);
哪种选择更好?另外,我应该使用spark上下文的内置方法从hdfs读取文件,而不是使用hdfs路径吗?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!