flink表api无法将数据集转换为数据流

zyfwsgd6  于 2021-06-26  发布在  Flink
关注(0)|答案(2)|浏览(443)

我正在使用flink table api使用java,我想将数据集转换为数据流。。。。以下是我的代码:

TableEnvironment tableEnvironment=new TableEnvironment();
Table tab1=table.where("related_value < 2014").select("related_value,ref_id");
DataSet<MyClass>ds2=tableEnvironment.toDataSet(tab1, MyClass.class);
DataStream<MyClass> d=tableEnvironment.toDataStream(tab1, MyClass.class);

但是当我尝试执行这个程序时,它抛出以下异常:
org.apache.flink.api.table.expressionexception:javastreamingtranslator的根无效:根(arrayseq((related\u value,double),(ref\u id,string)))。您是否尝试过将基于数据集的表转换为数据流,或将基于数据集的表转换为基于数据流的表?我想知道如何使用flink table api将数据集转换为数据流??
我想知道的另一件事是,对于模式匹配,有flink cep库可用。但是使用flink table api进行模式匹配可行吗??

kmynzznz

kmynzznz1#

使用时不能转换为数据流api TableEnvironment ,必须创建 StreamTableEnvironment 要从表转换为数据流,请执行以下操作:

final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(configuration, fsSettings);
DataStream<String> finalRes = fsTableEnv.toAppendStream(tableNameHere, MyClass.class);

希望能对你有所帮助。
谨致问候!

06odsfpq

06odsfpq2#

flink的表api不是为转换 DataSet 变成一个 DataStream 反之亦然。使用表api是不可能做到这一点的,目前使用flink也没有其他方法可以做到这一点。
统一 DataStream 以及 DataSet api(将批处理作为流的一种特殊情况处理,即作为有界流)是flink的长期路线图。

相关问题