如何使用map
和filter
函数来使用RDD读取CSV文件,并使用CSV文件根据特定列进行选择?这里是一个示例CSV文件。
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
1,2017-07-01 00:06:25,2017-07-01 00:10:50,1,1.20,1,N,249,90,1,5.5,0.5,0.5,1.35,0,0.3,8.15
1,2017-07-01 00:20:04,2017-07-01 00:21:38,2,.20,1,N,249,158,2,3,0.5,0.5,0,0,0.3,4.3
1,2017-07-01 00:44:10,2017-07-01 00:59:29,1,4.30,1,N,100,45,1,15.5,0.5,0.5,3.35,0,0.3,20.15
1,2017-07-01 00:07:33,2017-07-01 00:31:30,1,8.30,1,N,138,162,1,27,0.5,0.5,6.8,5.76,0.3,40.86
字符串
我尝试了下面的代码,但我不知道如何根据特定的列进行过滤并获取相关的行。
public class SparkUseCase{
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> allRows = sc.textFile("in/trip_yellow_taxi.data");
System.out.println(allRows.take(5));
List<String> headers = Arrays.asList(allRows.take(1).get(0).split(","));
String field="VendorID";
JavaRDD<String>dataWithoutHeaders = allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field));
JavaRDD<Integer> VendorID = dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)]));
for (Integer i:VendorID.collect()){
System.out.println(i);
}
}
}
型
感谢你的帮助。
过滤案例:过滤所有RatecodeID为4的记录。
2条答案
按热度按时间0dxa2lsx1#
使用当前代码,您需要使用正确的字段名称,而不是
VendorID
.字符串
但是,不要使用Spark1 RDD和
split(",")
的“穷人的CSV解析器”。将Spark2 csv阅读器与DataFrame一起使用。
Scala中的示例
型
juud5qan2#
您还可以使用SparkSession读取CSV并根据您的要求查询数据。
字符串