Spark JavaRDD程序读取csv和过滤器

camsedfj  于 2023-11-22  发布在  Apache
关注(0)|答案(2)|浏览(213)

如何使用mapfilter函数来使用RDD读取CSV文件,并使用CSV文件根据特定列进行选择?这里是一个示例CSV文件。

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount

1,2017-07-01 00:06:25,2017-07-01 00:10:50,1,1.20,1,N,249,90,1,5.5,0.5,0.5,1.35,0,0.3,8.15
1,2017-07-01 00:20:04,2017-07-01 00:21:38,2,.20,1,N,249,158,2,3,0.5,0.5,0,0,0.3,4.3
1,2017-07-01 00:44:10,2017-07-01 00:59:29,1,4.30,1,N,100,45,1,15.5,0.5,0.5,3.35,0,0.3,20.15
1,2017-07-01 00:07:33,2017-07-01 00:31:30,1,8.30,1,N,138,162,1,27,0.5,0.5,6.8,5.76,0.3,40.86

字符串
我尝试了下面的代码,但我不知道如何根据特定的列进行过滤并获取相关的行。

public class SparkUseCase{

    public static void main(String[] args) {    
    
        SparkConf conf = new SparkConf().setAppName("CSV Reader").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> allRows = sc.textFile("in/trip_yellow_taxi.data");
        System.out.println(allRows.take(5));
        List<String> headers = Arrays.asList(allRows.take(1).get(0).split(","));

        String field="VendorID";
        
        JavaRDD<String>dataWithoutHeaders = allRows.filter(x -> !(x.split(",")[headers.indexOf(field)]).equals(field));
        
        JavaRDD<Integer> VendorID = dataWithoutHeaders.map(x -> Integer.valueOf(x.split(",")[headers.indexOf(field)]));
                
        for (Integer i:VendorID.collect()){
            System.out.println(i);
        }
        
      }
}


感谢你的帮助。
过滤案例:过滤所有RatecodeID为4的记录。

0dxa2lsx

0dxa2lsx1#

使用当前代码,您需要使用正确的字段名称,而不是VendorID.

rdd.filter(x -> x.split(",")[index].equals("4"));

字符串

但是,不要使用Spark1 RDD和split(",")的“穷人的CSV解析器”。

将Spark2 csv阅读器与DataFrame一起使用。
Scala中的示例

val spark = SparkSession.builder().getOrCreate() 
val df = spark
  .read
  .format("csv")
  .option("header", "true")
  .load("in/trip_yellow_taxi.data")

val rates4 = df.filter("RatecodeID == 4")
rates4.show(false)

juud5qan

juud5qan2#

您还可以使用SparkSession读取CSV并根据您的要求查询数据。

SparkSession spark = SparkSession.builder().appName("CDX JSON Merge Job")
                .getOrCreate();
Dataset<Row> csvDataset = spark.read().format("csv").option("header", "true")
                .load("C:\\sample.csv");
csvDataset.createOrReplaceTempView("csvdataTable");
Dataset<Row> reducedCSVDataset = spark.sql("select VendorID from csvdataTable limit 2 ");
Dataset<String> rdds = reducedCSVDataset.toDF().select("VendorID").as(Encoders.STRING());
List<String> listOfStrings = rdds.collectAsList();
listOfStrings.forEach(x -> System.out.println(x));

字符串

相关问题