当我在下面分组时,选择getting type maching error。我已经尝试转换为timestamp并尝试更改pojos localdatetime类型。大多数转换为row.class的示例代码找不到任何自定义类示例。 SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name
引发的错误: Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'zaman' does not match requested type. Requested: GenericType<java.time.LocalDateTime>; Actual: LocalDateTime
代码:
tableEnvironment.registerDataStream("STOCK", messageStream, "name, price, rowtime.rowtime");
Table result = tableEnvironment.sqlQuery(
"SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name");
result.printSchema();
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(kp.getProducerProperties().getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"STOCKGROUP", new SimpleStringSchema());
myProducer.setWriteTimestampToKafka(true);
DataStream<Tuple2<Boolean, StockGroup>> stream = tableEnvironment.toRetractStream(result, StockGroup.class);
stream.map(x -> x.f1.toString()).addSink(myProducer);
stockgroup.pojo类;
public String name;
public Double minPrice;
public Double maxPrice;
public Double avarage;
public Long sayi;
public LocalDateTime zaman;
印刷体图式;
root
|-- name: STRING
|-- minPrice: DOUBLE
|-- maxPrice: DOUBLE
|-- avarage: DOUBLE
|-- sayi: BIGINT NOT NULL
|-- zaman: TIMESTAMP(3) *ROWTIME*
暂无答案!
目前还没有任何答案,快来回答吧!