作为一个spark新手,我想写一个spark单元测试来测试我的函数 implements MapFunction<Row, Row>
,我在用一些我想用编程方式创建的avro对象创建sparkDataframe时遇到了问题。
这就是我现在所拥有的:
AnAvroRecord inputElement = getAvroRecord();//avro record extends SpecificRecordBase implements SpecificRecord
Schema schema = inputElement.getSchema(); //its avro schema
Row row = avroToRowConverter(event); //not sure about this function
List<Row> rows = Arrays.asList(row);
StructType avroRecordAsStructType = (StructType) SchemaConverters.toSqlType(AnAvroRecord.getClassSchema())
.dataType();
Dataset<Row> csDatasetRows = this.sparkSession.createDataFrame(rows, avroRecordAsStructType);
public static <A extends GenericRecord> Row avroToRowConverter(A avroRecord) {
if (null == avroRecord) {
return null;
}
Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
for (Schema.Field field : avroRecord.getSchema().getFields()) {
objectArray[field.pos()] = avroRecord.get(field.pos());
}
return new GenericRowWithSchema(objectArray, structType);
}
我接受了这个任务 avroToRowConverter
从这里开始http://bytepadding.com/big-data/spark/convert-avro-object-to-row/. 我甚至不确定它是否可以工作,因为它抛出了一个异常:
java.lang.IllegalArgumentException: The value ((a good json serialization of an inner object of AnAvroRecord)) of the type (the type) cannot be converted to struct<...>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
在某种程度上,java对象似乎可以转换为结构,即使我看到类型是对齐的并且可以匹配。
如何用编程方式创建的一些avro对象创建sparkDataframe?
我正在使用spark 2.4.5。
暂无答案!
目前还没有任何答案,快来回答吧!