java—如何在rdd操作中读取Dataframe

6g8kf2rb  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(420)

场景一有两个包含文本文件路径的字符串列表,列表a,列表b。我想把笛卡尔积的表a,b实现一个笛卡尔Dataframe的比较。
我尝试的方法是先做笛卡尔积,把它转移到pairdd,然后再进行foreach apply操作。

List<String> a = Lists.newList("/data/1.text",/data/2.text","/data/3.text");
 List<String> b = Lists.newList("/data/4.text",/data/5.text","/data/6.text");

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
List<Tuple2<String,String>> cartesian = cartesian(a,b);
jsc.parallelizePairs(cartesian).filter(new Function<Tuple2<String, String>, Boolean>() {
        @Override public Boolean call(Tuple2<String, String> tup) throws Exception {
            Dataset<Row> text1 = spark.read().text(tup._1); <-- this throw NullPointerException
            Dataset<Row> text2 = spark.read().text(tup._2);
            return text1.first()==text2.first(); <-- this is an indicative function only
        });

甚至我也可以用spark做笛卡尔

JavaRDD<Column> sourceRdd = jsc.parallelize(a);
JavaRDD<Column> allRdd = jsc.parallelize(b);

sourceRdd.cache().cartesian(allRdd).filter(new Function<Tuple2<String, String>, Boolean>() {
        @Override public Boolean call(Tuple2<Column, Column> tup) throws Exception {
            Dataset<Row> text1 = spark.read().text(tup._1);  <-- same issue
            Dataset<Row> text2 = spark.read().text(tup._2);
            return text1.first()==text2.first();
        }
    });

请建议处理这个问题的好方法。

zazmityj

zazmityj1#

不知道我是否完全理解你的问题。下面是使用spark和java的笛卡尔示例。

public class CartesianDemo {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("CartesianDemo").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    //list
    List<String> listOne = Arrays.asList("one", "two", "three", "four", "five");
    List<String> listTwo = Arrays.asList("ww", "xx", "yy", "zz");
    //RDD
    JavaRDD<String> rddOne = jsc.parallelize(listOne);
    JavaRDD<String> rddTwo = jsc.parallelize(listTwo);
    //Cartesian
    JavaPairRDD<String, String> cartesianRDD = rddOne.cartesian(rddTwo);
    //print
    cartesianRDD.foreach(data -> {
        System.out.println("X=" + data._1() + " Y=" + data._2());
    });
    //stop
    jsc.stop();
    jsc.close();
  }
}

相关问题