spark单元测试:以编程方式创建avro记录并构建Dataframe

cczfrluj  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(213)

作为一个spark新手,我想写一个spark单元测试来测试我的函数 implements MapFunction<Row, Row> ,我在用一些我想用编程方式创建的avro对象创建sparkDataframe时遇到了问题。
这就是我现在所拥有的:

AnAvroRecord inputElement = getAvroRecord();//avro record extends SpecificRecordBase implements SpecificRecord
Schema schema = inputElement.getSchema(); //its avro schema
Row row = avroToRowConverter(event); //not sure about this function
List<Row> rows = Arrays.asList(row);
StructType avroRecordAsStructType = (StructType) SchemaConverters.toSqlType(AnAvroRecord.getClassSchema())
            .dataType();
Dataset<Row> csDatasetRows = this.sparkSession.createDataFrame(rows, avroRecordAsStructType);

public static <A extends GenericRecord> Row avroToRowConverter(A avroRecord) {
        if (null == avroRecord) {
            return null;
        }
        Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
        StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
        for (Schema.Field field : avroRecord.getSchema().getFields()) {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
        return new GenericRowWithSchema(objectArray, structType);
    }

我接受了这个任务 avroToRowConverter 从这里开始http://bytepadding.com/big-data/spark/convert-avro-object-to-row/. 我甚至不确定它是否可以工作,因为它抛出了一个异常:

java.lang.IllegalArgumentException: The value ((a good json serialization of an inner object of AnAvroRecord)) of the type (the type) cannot be converted to struct<...>
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)

在某种程度上,java对象似乎可以转换为结构,即使我看到类型是对齐的并且可以匹配。
如何用编程方式创建的一些avro对象创建sparkDataframe?
我正在使用spark 2.4.5。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题