java—在ApacheSpark中,将javardd< row>转换为dataset< row>会产生异常:arraylist不是字符串模式的有效外部类型

wsewodh2  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(312)

我正在使用hbase spark connector将hbase数据提取到spark中 JavaRDD<Row> (我觉得我能够成功地做到这一点,因为我能够打印提取的hbase数据)。那么,我想把它转换成 JavaRDD<Row>Dataset<Row> . 但它给我的错误,这是进一步的职位。首先让我看看我的代码是什么样子的。

private static JavaRDD<Row> loadHBaseRDD() throws ParseException
{
    //form list of row keys
    List<byte[]> rowKeys = new ArrayList<byte[]>(5);
    //consider ids is class level variable
    ids.forEach(id -> {
        rowKeys.add(Bytes.toBytes(id));     
    });
    JavaRDD<byte[]> rdd = jsc.parallelize(rowKeys);

    //make hbase-spark connector call 
    JavaRDD resultJRDD = jhbc.bulkGet(TableName.valueOf("table1"),2,rdd,new GetFunction(),new ResultFunction());

    return resultJRDD;
}

注意 bulkGet() 接受示例 GetFunction 以及 RsultFunction 班级。 GetFunction 类有一个方法,该方法返回 Get 类(来自hbase客户端):

public static class GetFunction implements Function<byte[], Get> {
    private static final long serialVersionUID = 1L;
    public Get call(byte[] v) throws Exception {
        return new Get(v);
    }
}

这个 ResultFunction 具有转换示例的函数 Result (hbase客户端类)到 Row :

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        List<String> values = new ArrayList<String>(); //notice this is arraylist, we talk about this latter

        for (Cell cell : result.rawCells()) {
            values.add(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        return RowFactory.create(values);
    }
}

当我打电话的时候 loadHBaseRDD() 并打印返回值,正确打印值:

JavaRDD<Row> hbaseJavaRDD = loadHBaseRDD();
hbaseJavaRDD.foreach(row -> { 
    System.out.println(row);   //this prints rows correctly
});

这意味着行已经从hbase正确地提取到spark。现在我要皈依 JavaRDD<Row>Dataset<Row> 正如这里所解释的。因此我首先创造 StructType :

StructType schema = //create schema

然后我试着转换 JavaRDD 到Dataframe:

Dataset<Row> hbaseDataFrame = sparksession1.createDataFrame(hbaseJavaRDD, schema);
hbaseDataFrame.show(false);

这会引发异常,在第行发生非常大的stacktrace(下面只显示了其中的一部分) hbaseDataFrame.show(false) 第一行如下:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string

看来,因为 values 属于类型 ArrayList 内部 ResultFunction.call() ,这是个例外 java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.ArrayList is not a valid external type for schema of string .
在stackoveflow上有一个[类似的问题],它的答案是,应该返回列表,而不是列表 String[][] . 虽然我不明白为什么要回来 String[][] ,我修改了 ResultFunction 拥有 values 类型 String[][] :

public static class ResultFunction implements Function<Result, Row> 
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];
        String[][] valuesWrapped = new String[1][]; 

        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        valuesWrapped[0] = values;
        return RowFactory.create(valuesWrapped);
    }
}

它在同一行给出了以下例外 hbaseDataFrame.show(false) :

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: [[Ljava.lang.String; is not a valid external type for schema of string

最后我修改了 ResultFunction 又上课了 values 类型的变量 String[] :

public static class ResultFunction implements Function<Result, Row>
{
    private static final long serialVersionUID = 1L;
    public Row call(Result result) throws Exception 
    {
        String[] values = new String[result.rawCells().length];     
        for(int i=0;i<result.rawCells().length;i++)
        {
            values[i] = Bytes.toString(CellUtil.cloneValue(result.rawCells()[i]));
        }
        return values;
    }
}

这给了我一个例外,大堆栈跟踪有起始线:

java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 14

那么这里可能出了什么问题?我该怎么做?

cvxl0en2

cvxl0en21#

(返回的)最后方法 String[] values )他是对的。问题在于格式错误的模式。似乎我最终在模式中比数据中多了一列(由于架构字符串中包含由单个空格分隔的列,因此使用了额外的空格字符。额外的空间正在创建额外的列。)

相关问题