我正在使用hbase spark connector将hbase数据提取到spark中 JavaRDD<Row>
(我觉得我能够成功地做到这一点,因为我能够打印提取的hbase数据)。那么,我想把它转换成 JavaRDD<Row>
至 Dataset<Row>
. 但它给我的错误,这是进一步的职位。首先让我看看我的代码是什么样子的。
private static JavaRDD<Row> loadHBaseRDD() throws ParseException
{
//form list of row keys
List<byte[]> rowKeys = new ArrayList<byte[]>(5);
//consider ids is class level variable
ids.forEach(id -> {
rowKeys.add(Bytes.toBytes(id));
});
JavaRDD<byte[]> rdd = jsc.parallelize(rowKeys);
//make hbase-spark connector call
JavaRDD resultJRDD = jhbc.bulkGet(TableName.valueOf("table1"),2,rdd,new GetFunction(),new ResultFunction());
return resultJRDD;
}
注意 bulkGet()
接受示例 GetFunction
以及 RsultFunction
班级。 GetFunction
类有一个方法,该方法返回 Get
类(来自hbase客户端):
public static class GetFunction implements Function<byte[], Get> {
private static final long serialVersionUID = 1L;
public Get call(byte[] v) throws Exception {
return new Get(v);
}
}
这个 ResultFunction
具有转换示例的函数 Result
(hbase客户端类)到 Row
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
List<String> values = new ArrayList<String>(); //notice this is arraylist, we talk about this latter
for (Cell cell : result.rawCells()) {
values.add(Bytes.toString(CellUtil.cloneValue(cell)));
}
return RowFactory.create(values);
}
}
当我打电话的时候 loadHBaseRDD()
并打印返回值,正确打印值:
JavaRDD<Row> hbaseJavaRDD = loadHBaseRDD();
hbaseJavaRDD.foreach(row -> {
System.out.println(row); //this prints rows correctly
});
这意味着行已经从hbase正确地提取到spark。现在我要皈依 JavaRDD<Row>
至 Dataset<Row>
正如这里所解释的。因此我首先创造 StructType
:
StructType schema = //create schema
然后我试着转换 JavaRDD
到Dataframe:
Dataset<Row> hbaseDataFrame = sparksession1.createDataFrame(hbaseJavaRDD, schema);
hbaseDataFrame.show(false);
这会引发异常,在第行发生非常大的stacktrace(下面只显示了其中的一部分) hbaseDataFrame.show(false)
第一行如下:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string
看来,因为 values
属于类型 ArrayList
内部 ResultFunction.call()
,这是个例外 java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string
.
在stackoveflow上有一个[类似的问题],它的答案是,应该返回列表,而不是列表 String[][]
. 虽然我不明白为什么要回来 String[][]
,我修改了 ResultFunction
拥有 values
类型 String[][]
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
String[] values = new String[result.rawCells().length];
String[][] valuesWrapped = new String[1][];
for(int i=0;i<result.rawCells().length;i++)
{
values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
}
valuesWrapped[0] = values;
return RowFactory.create(valuesWrapped);
}
}
它在同一行给出了以下例外 hbaseDataFrame.show(false)
:
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: [[Ljava.lang.String; is not a valid external type for schema of string
最后我修改了 ResultFunction
又上课了 values
类型的变量 String[]
:
public static class ResultFunction implements Function<Result, Row>
{
private static final long serialVersionUID = 1L;
public Row call(Result result) throws Exception
{
String[] values = new String[result.rawCells().length];
for(int i=0;i<result.rawCells().length;i++)
{
values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
}
return values;
}
}
这给了我一个例外,大堆栈跟踪有起始线:
java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14
那么这里可能出了什么问题?我该怎么做?
1条答案
按热度按时间cvxl0en21#
(返回的)最后方法
String[] values
)他是对的。问题在于格式错误的模式。似乎我最终在模式中比数据中多了一列(由于架构字符串中包含由单个空格分隔的列,因此使用了额外的空格字符。额外的空间正在创建额外的列。)