我在努力创造 Dataset<Row>
对象来自 JavaRDD<Tuple2<Row, Row>>
对象。
我遵循以下步骤,
转换 Java<Tuple2<Row,Row>>
至 JavaRDD<Row>
使用 toDataset()
的功能 sqlContext
使用架构转换为数据集。
但是,在第一步,我不能使用 Row.fromTuple()
在代码中类似scala的函数。在第二步中,我无法使用rowtag进行转换。
下面是运行时错误。
Error: java: cannot find symbol
symbol: method fromTuple(scala.Tuple2<org.apache.spark.sql.Row,org.apache.spark.sql.Row>)
location: interface org.apache.spark.sql.Row
我试着像下面这样转变
ClassTag<Row> rowTag = scala.reflect.ClassTag$.MODULE$.apply(Row.class);
private Dataset<Row> joinResults(SparkSession session, RDD<Tuple2<Row, Row>> resultRDD) {
JavaRDD<Tuple2<Row, Row>> results = resultRDD.toJavaRDD();
JavaRDD<Row> ds = results.map(new Function<Tuple2<Row, Row>, Row>() {
@Override
public Row call(Tuple2<Row, Row> rowRowTuple2) throws Exception {
return Row.fromTuple(rowRowTuple2); // run time error
}
});
return session.sqlContext().createDataset(ds, rowTag); //gives error
}
任何帮助都将不胜感激。我正在使用lucenerdd记录链接,它将rdd返回给我,这样我就没有直接对数据集执行操作的选项。我不想每次都创建模式/编码器,因为那样会限制链接函数的使用。我使用的是scala 2.11和spark 2.4.3 libs。
2条答案
按热度按时间fgw7neuy1#
.createDataset()
接受RDD<T>
不是JavaRDD<T>.
你需要使用ds.rdd()
你需要创建和传递org.apache.spark.sql.catalyst.encoders.RowEncoder
不要按创建一行Row.fromTuple(rowRowTuple2)
(即每个元素都是一行的行)。单行应包含基元类型或嵌套结构(示例)。tkclm6bt2#
也许这是有用的-
tuple2<row,row>->数据集
tuple2<integer,string>->数据集
大多数sparkapi都是在scala seq上工作的,最好有下面的实用工具方便地转换java list->scala序列