flink数据源迭代

doinxwow  于 2021-05-31  发布在  Hadoop
关注(0)|答案(2)|浏览(324)

我正在尝试迭代数据源:

val env = ExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)

      val job = Job.getInstance
      FileInputFormat.addInputPath(
        job,
        new Path("file.parquet.gz")
      )

      val hadoopInputFormat: HadoopInputFormat[Void, GenericRecord] =
        new HadoopInputFormat(
          new AvroParquetInputFormat[GenericRecord],
          classOf[Void],
          classOf[GenericRecord],
          job
        )
       val data: DataSource[tuple.Tuple2[Void, GenericRecord]] = env.createInput(hadoopInputFormat)

当我做data.print时,我可以看到元组中的数据。
但当我这么做的时候:

data.map
     {
       res =>
         println("!!!!!!!!!!!111")
         println( res.f1)
     }

什么也没印出来。
我想迭代数据源并获取genericord。请帮帮我。

kzipqqlq

kzipqqlq1#

为了执行flink批处理程序而不调用 print 或者 collect ,你需要打电话 env.execute() . 在没有上述api调用的情况下,只有这个调用才会触发程序的执行。

e37o9pze

e37o9pze2#

您可以使用data.collect然后使用:data.iterator().next()进行迭代

相关问题