我有一个数据流api的工作,这是运行良好,但我需要使用 DataStream<Event>
并将其传递给tableapi以调用register python函数,然后将结果传递回新的 DataStream
重新处理调用的结果。我这里有两个问题,一个是我可以这样运行作业:
/*DataStream Job*/
StreamExecutionEnvironment env = EnvironmentConfiguration.getEnv();
final DataStream<Event> eventsStream = RabbitMQConnector.eventStreamObject(env)
.flatMap(new RabbitMQConsumer())
.uid("cep.objects_mapper_id")
.name("Event Mapper")
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.timestamp.getTime()))
.name("Watermarks Added");
/*TableAPI job*/
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "python.exe");
fsTableEnv.getConfig().getConfiguration().setString("python.executable", "python.exe");
fsTableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
SingleOutputStreamOperator<String> stream = eventsStream.map(x -> x.name);
Table source = fsTableEnv.fromDataStream(stream).as("name");
Table result = source.select("func1(name)");
DataStream<String> finalRes = fsTableEnv.toAppendStream(result, String.class);
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
env.execute(job_name);
在这个例子中,我没有任何问题,但是python函数永远不会返回,我担心在我返回之前它永远不会被调用 result.exeute();
,然后当我从上面和后面应用相同的例子时
finalRes.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) {
LOG.info("Record from table: " + value);
}
});
做 result.execute();
为了执行表,python函数可以工作,但是datastreamapi作业在tableapi完成之前永远不会执行,但是由于datastreamapi作业从未初始化,因此使用者不工作,因此应该发送到tableapi然后发送到python函数的流总是空的。
我的问题是:有没有办法并行运行两个作业,或者一个接一个地运行?注意:我会创建一个timertask,在datastreamapi作业启动后等待一段时间,然后启动tableapi作业(使用parallelism 1),它似乎可以工作,但是tableapi作业被创建并停止了很多次。
有没有更好的办法?希望有人能理解我的问题。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!