如何阅读 avro
从 Flink
在 scala
?
批/流/表是否相同: StreamExecutionEnvironment
/ ExecutionEnvironment
/ TableEnvironment
?
会不会是这样: val custTS: TableSource = new AvroInputFormat("/path/to/file", ...)
下面是java avro实现参考(连接器),但在任何地方都找不到scala参考:
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
1条答案
按热度按时间b5lpy0ml1#
你可以用Flink的
InputFormats
,包括AvroInputFormat
,以及scala api:流式处理和批处理:
val avroInputStream = env.createInput(new AvroInputFormat[User](in, classOf[User]))
表api:tableEnv.registerTable("table", avroInputStream.toTable)