我正在尝试迭代数据源:
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val job = Job.getInstance
FileInputFormat.addInputPath(
job,
new Path("file.parquet.gz")
)
val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
new HadoopInputFormat(
new AvroParquetInputFormat[GenericRecord],
classOf[Void],
classOf[GenericRecord],
job
)
val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)
当我做data.print时,我可以看到元组中的数据。
但当我这么做的时候:
data.map
{
res =>
println("!!!!!!!!!!!111")
println( res.f1)
}
什么也没印出来。
我想迭代数据源并获取genericord。请帮帮我。
2条答案
按热度按时间kzipqqlq1#
为了执行flink批处理程序而不调用
print
或者collect
,你需要打电话env.execute()
. 在没有上述api调用的情况下,只有这个调用才会触发程序的执行。e37o9pze2#
您可以使用data.collect然后使用:data.iterator().next()进行迭代