org.apache.flink.api.table.tableexception:“需要字段引用表达式上的别名”

gorkyyrv  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(556)

我在1.1-snapshot版本上使用ApacheFlink的表api来评估流上的sql查询。
以下是我的代码:

private static final int MAX_RACK_ID = 10;
private static final long PAUSE = 100;
private static final double TEMP_STD = 20;
private static final double TEMP_MEAN = 80;

public static void main(String[] args)
{
     StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env);

     DataStream<MonitoringEvent> dstream = env.addSource(new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMP_STD, TEMP_MEAN));
     tableEnv.registerDataStream("TemperatureData", dstream,"rackid,temperature,timestamp");

     Table tab1 = tableEnv.sql("select STREAM rackid,temperature,timestamp from TemperatureData where temperature>=100");
     DataStream<TemperatureEvent>tempstream=tableEnv.toDataStream(tab1, TemperatureEvent.class);
     tempstream.print();
}

当我执行这个程序时,它抛出以下异常:

Exception in thread "main" org.apache.flink.api.table.TableException: Alias on field reference expression expected.
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:299)
    at org.apache.flink.api.table.TableEnvironment$$anonfun$4.apply(TableEnvironment.scala:292)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.flink.api.table.TableEnvironment.getFieldInfo(TableEnvironment.scala:292)
    at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:212)
    at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130)
    at com.yash.flink.Program.main(Program.java:31)

我有几个问题:
使用ApacheFlink的表api在流上编写sql查询的方法是什么?
如何在flink中实现这个查询?
这是flink表api中的bug吗??

5f0d552i

5f0d552i1#

您在表api中发现了一个限制/错误。这个问题是由于 DataStream<MonitoringEvent> 作为一张table。你应该这样做

tableEnv.registerDataStream(
  "TemperatureData", 
  dstream,
  "rackid AS rackid, temperature AS temperature, timestamp AS timestamp"
);

让它工作。在flink1.1.0中发布streamsql之前,我将确保问题得到解决。

相关问题