avroinputformat返回一组对象地址而不是值

vsmadaxz  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(326)

我在用flink avrooutputformat写一些数据,

val source: DataSet[Row] = environment.createInput(inputBuilder.finish)
val tableEnv: BatchTableEnvironment = new BatchTableEnvironment(environment, TableConfig.DEFAULT)
val table: Table = source.toTable(tableEnv)
val avroOutputFormat = new AvroOutputFormat[Row](classOf[Row])
avroOutputFormat.setCodec(AvroOutputFormat.Codec.NULL)
source.write(avroOutputFormat, "/Users/x/Documents/test_1.avro").setParallelism(1)
environment.execute()

这会将数据写入名为 test_1.avro . 当我试图把文件读作,

val users = new AvroInputFormat[Row](new Path("/Users/x/Documents/test_1.avro"), classOf[Row])
val usersDS = environment.createInput(users)
usersDS.print()

这会将行打印为,

java.lang.Object@4462efe1,java.lang.Object@7c3e4b1a,java.lang.Object@2db4ad1,java.lang.Object@765d55d5,java.lang.Object@2513a118,java.lang.Object@2bfb583b,java.lang.Object@73ae0257,java.lang.Object@6fc1020a,java.lang.Object@5762658b

有没有一种方法可以打印这些数据值而不是对象地址。

3bygqnnd

3bygqnnd1#

您正在以一种奇怪的方式混合表api和数据流api。最好坚持使用一个api或使用适当的转换方法。
实际上,您基本上没有让flink知道预期的输入/输出模式。 classOf[Row] 什么都不是。
要将表写入avro文件,请使用表连接器。基本素描

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Avro().avroSchema("...")) // <- Adjust
    .withSchema(schema)
    .createTemporaryTable("AvroSinkTable")
table.insertInto("AvroSinkTable")

编辑:到目前为止,文件系统连接器不幸不支持avro。
因此,除了使用dataset api之外别无选择。我建议使用avrohugger为avro模式生成适当的scala类。

// convert to your scala class
val dsTuple: DataSet[User] = tableEnv.toDataSet[User](table)
// write out
val avroOutputFormat = new AvroOutputFormat<>(User.class)
avroOutputFormat.setCodec(Codec.SNAPPY)
avroOutputFormat.setSchema(User.SCHEMA$)
specificUser.write(avroOutputFormat, outputPath1)

相关问题