scala—如何将flink数据集元组转换为一列

7y4bm7vi  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(329)

我有一个数据图表

  1. 1 2
  2. 1 4
  3. 4 1
  4. 4 2
  5. 4 3
  6. 3 2
  7. 2 3

但是我找不到一种方法把它转换成一列的数据集

  1. 1
  2. 2
  3. 1
  4. 4
  5. 4
  6. 1
  7. ...

这是我的代码,我使用scala listbuffer,但是在flink数据集中找不到一种方法

  1. val params: ParameterTool = ParameterTool.fromArgs(args)
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. env.getConfig.setGlobalJobParameters(params)
  4. val text = env.readTextFile(params.get("input"))
  5. val tupleText = text.map { line =>
  6. val arr = line.split(" ")
  7. (arr(0), arr(1))
  8. }
  9. var x: Seq[(String, String)] = tupleText.collect()
  10. var tempList = new ListBuffer[String]
  11. x.foreach(line => {
  12. tempList += line._1
  13. tempList += line._2
  14. })
  15. tempList.foreach(println)
zynd9foi

zynd9foi1#

你可以用它来做 flatMap :

  1. // get some input
  2. val input: DataSet[(Int, Int)] = env.fromElements((1, 2), (2, 3), (3, 4))
  3. // emit every tuple element as own record
  4. val output: DataSet[Int] = input.flatMap( (t, out) => {
  5. out.collect(t._1)
  6. out.collect(t._2)
  7. })
  8. // print result
  9. output.print()

相关问题