spark sql java无法将fromtuple转换为row和dataframe

68de4m5k  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(663)

我在努力创造 Dataset<Row> 对象来自 JavaRDD<Tuple2<Row, Row>> 对象。
我遵循以下步骤,
转换 Java<Tuple2<Row,Row>>JavaRDD<Row> 使用 toDataset() 的功能 sqlContext 使用架构转换为数据集。
但是,在第一步,我不能使用 Row.fromTuple() 在代码中类似scala的函数。在第二步中,我无法使用rowtag进行转换。
下面是运行时错误。

Error: java: cannot find symbol
  symbol:   method fromTuple(scala.Tuple2<org.apache.spark.sql.Row,org.apache.spark.sql.Row>)
  location: interface org.apache.spark.sql.Row

我试着像下面这样转变

ClassTag<Row> rowTag = scala.reflect.ClassTag$.MODULE$.apply(Row.class);

private Dataset<Row> joinResults(SparkSession session, RDD<Tuple2<Row, Row>> resultRDD) {
    JavaRDD<Tuple2<Row, Row>> results = resultRDD.toJavaRDD();

    JavaRDD<Row> ds = results.map(new Function<Tuple2<Row, Row>, Row>() {
        @Override
        public Row call(Tuple2<Row, Row> rowRowTuple2) throws Exception {
            return Row.fromTuple(rowRowTuple2); // run time error
        }
    });

    return session.sqlContext().createDataset(ds, rowTag); //gives error
}

任何帮助都将不胜感激。我正在使用lucenerdd记录链接,它将rdd返回给我,这样我就没有直接对数据集执行操作的选项。我不想每次都创建模式/编码器,因为那样会限制链接函数的使用。我使用的是scala 2.11和spark 2.4.3 libs。

fgw7neuy

fgw7neuy1#

.createDataset() 接受 RDD<T> 不是 JavaRDD<T>. 你需要使用 ds.rdd() 你需要创建和传递 org.apache.spark.sql.catalyst.encoders.RowEncoder 不要按创建一行 Row.fromTuple(rowRowTuple2) (即每个元素都是一行的行)。单行应包含基元类型或嵌套结构(示例)。

tkclm6bt

tkclm6bt2#

也许这是有用的-

tuple2<row,row>->数据集

StructType schema = new StructType()
                .add(new StructField("id", DataTypes.IntegerType, true, Metadata.empty()))
                .add(new StructField("name", DataTypes.StringType, true, Metadata.empty()));

        JavaRDD<Tuple2<Row, Row>> tuple2JavaRDD = new JavaSparkContext(spark.sparkContext())
                .parallelize(
                        Arrays.asList(Tuple2.apply(RowFactory.create(1), RowFactory.create("a")),
                                Tuple2.apply(RowFactory.create(2), RowFactory.create("b")))
                );
        JavaRDD<Row> rowJavaRDD1 = tuple2JavaRDD.map(t -> Row$.MODULE$.merge(
                toScalaSeq(Arrays.asList(t._1, t._2))
        ));
        Dataset<Row> df1 = spark.createDataFrame(rowJavaRDD1, schema);
        df1.show(false);
        df1.printSchema();
        /**
         * +---+----+
         * |id |name|
         * +---+----+
         * |1  |a   |
         * |2  |b   |
         * +---+----+
         *
         * root
         *  |-- id: integer (nullable = true)
         *  |-- name: string (nullable = true)
         */

tuple2<integer,string>->数据集

JavaRDD<Tuple2<Integer, String>> resultRDD = new JavaSparkContext(spark.sparkContext())
                .parallelize(Arrays.asList(Tuple2.apply(1, "a"), Tuple2.apply(2, "b")));
        JavaRDD<Row> rowJavaRDD = resultRDD.map(Row$.MODULE$::fromTuple);
        Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, schema);
        dataFrame.show(false);
        dataFrame.printSchema();
        /**
         * +---+----+
         * |id |name|
         * +---+----+
         * |1  |a   |
         * |2  |b   |
         * +---+----+
         *
         * root
         *  |-- id: integer (nullable = true)
         *  |-- name: string (nullable = true)
         */

大多数sparkapi都是在scala seq上工作的,最好有下面的实用工具方便地转换java list->scala序列

<T> Buffer<T> toScalaSeq(List<T> list) {
        return JavaConversions.asScalaBuffer(list);
    }

相关问题