java—flink表api上单个作业中的多个select查询

jmo0nnb3  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(335)

如果我想在从数据流创建的flink表上运行两个不同的select查询,blink planner会将它们作为两个不同的作业运行。有没有办法把它们结合起来,作为一个作业运行?示例代码:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(4);
  3. System.out.println("Running credit scores : ");
  4. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  5. DataStream<String> recordsStream =
  6. env.readTextFile("src/main/resources/credit_trial.csv");
  7. DataStream<CreditRecord> creditStream = recordsStream
  8. .filter((FilterFunction<String>) line -> !line.contains(
  9. "Loan ID,Customer ID,Loan Status,Current Loan Amount,Term,Credit Score,Annual Income,Years in current job" +
  10. ",Home Ownership,Purpose,Monthly Debt,Years of Credit History,Months since last delinquent,Number of Open Accounts," +
  11. "Number of Credit Problems,Current Credit Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
  12. .map(new MapFunction<String, CreditRecord>() {
  13. @Override
  14. public CreditRecord map(String s) throws Exception {
  15. String[] fields = s.split(",");
  16. return new CreditRecord(fields[0], fields[2], Double.parseDouble(fields[3]),
  17. fields[4], fields[5].trim().equals("")?0.0: Double.parseDouble(fields[5]),
  18. fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
  19. fields[8], Double.parseDouble(fields[15]));
  20. }
  21. });
  22. tableEnv.createTemporaryView("CreditDetails", creditStream);
  23. Table creditDetailsTable = tableEnv.from("CreditDetails");
  24. Table resultsTable = creditDetailsTable.select($("*"))
  25. .filter($("loanStatus").isEqual("Charged Off"));
  26. TableResult result = resultsTable.execute();
  27. result.print();
  28. Table resultsTable2 = creditDetailsTable.select($("*"))
  29. .filter($("loanStatus").isEqual("Fully Paid"));
  30. TableResult result2 = resultsTable2.execute();
  31. result2.print();

上面的代码创建了两个不同的作业,但我不希望这样。有出路吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题