spark单元测试:以编程方式创建avro记录并构建Dataframe

cczfrluj  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(243)

作为一个spark新手,我想写一个spark单元测试来测试我的函数 implements MapFunction<Row, Row> ,我在用一些我想用编程方式创建的avro对象创建sparkDataframe时遇到了问题。
这就是我现在所拥有的:

  1. AnAvroRecord inputElement = getAvroRecord();//avro record extends SpecificRecordBase implements SpecificRecord
  2. Schema schema = inputElement.getSchema(); //its avro schema
  3. Row row = avroToRowConverter(event); //not sure about this function
  4. List<Row> rows = Arrays.asList(row);
  5. StructType avroRecordAsStructType = (StructType) SchemaConverters.toSqlType(AnAvroRecord.getClassSchema())
  6. .dataType();
  7. Dataset<Row> csDatasetRows = this.sparkSession.createDataFrame(rows, avroRecordAsStructType);
  8. public static <A extends GenericRecord> Row avroToRowConverter(A avroRecord) {
  9. if (null == avroRecord) {
  10. return null;
  11. }
  12. Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
  13. StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
  14. for (Schema.Field field : avroRecord.getSchema().getFields()) {
  15. objectArray[field.pos()] = avroRecord.get(field.pos());
  16. }
  17. return new GenericRowWithSchema(objectArray, structType);
  18. }

我接受了这个任务 avroToRowConverter 从这里开始http://bytepadding.com/big-data/spark/convert-avro-object-to-row/. 我甚至不确定它是否可以工作,因为它抛出了一个异常:

  1. java.lang.IllegalArgumentException: The value ((a good json serialization of an inner object of AnAvroRecord)) of the type (the type) cannot be converted to struct<...>
  2. at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)

在某种程度上,java对象似乎可以转换为结构,即使我看到类型是对齐的并且可以匹配。
如何用编程方式创建的一些avro对象创建sparkDataframe?
我正在使用spark 2.4.5。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题