java—flink表api上单个作业中的多个select查询

jmo0nnb3  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(295)

如果我想在从数据流创建的flink表上运行两个不同的select查询,blink planner会将它们作为两个不同的作业运行。有没有办法把它们结合起来,作为一个作业运行?示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

System.out.println("Running credit scores : ");

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<String> recordsStream =
        env.readTextFile("src/main/resources/credit_trial.csv");

DataStream<CreditRecord> creditStream = recordsStream
        .filter((FilterFunction<String>) line -> !line.contains(
                "Loan ID,Customer ID,Loan Status,Current Loan Amount,Term,Credit Score,Annual Income,Years in current job" +
                        ",Home Ownership,Purpose,Monthly Debt,Years of Credit History,Months since last delinquent,Number of Open Accounts," +
                        "Number of Credit Problems,Current Credit Balance,Maximum Open Credit,Bankruptcies,Tax Liens"))
        .map(new MapFunction<String, CreditRecord>() {

            @Override
            public CreditRecord map(String s) throws Exception {

                String[] fields = s.split(",");

                return new CreditRecord(fields[0], fields[2], Double.parseDouble(fields[3]),
                        fields[4], fields[5].trim().equals("")?0.0: Double.parseDouble(fields[5]),
                        fields[6].trim().equals("")?0.0:Double.parseDouble(fields[6]),
                        fields[8], Double.parseDouble(fields[15]));
            }
        });
tableEnv.createTemporaryView("CreditDetails", creditStream);
        Table creditDetailsTable = tableEnv.from("CreditDetails");

Table resultsTable = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Charged Off"));

TableResult result = resultsTable.execute();

result.print();

Table resultsTable2 = creditDetailsTable.select($("*"))
        .filter($("loanStatus").isEqual("Fully Paid"));

TableResult result2 = resultsTable2.execute();

result2.print();

上面的代码创建了两个不同的作业,但我不希望这样。有出路吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题