flink sql结果字段与localdatetime上请求的类型错误不匹配

ryevplcw  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(361)

当我在下面分组时,选择getting type maching error。我已经尝试转换为timestamp并尝试更改pojos localdatetime类型。大多数转换为row.class的示例代码找不到任何自定义类示例。 SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name 引发的错误: Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'zaman' does not match requested type. Requested: GenericType<java.time.LocalDateTime>; Actual: LocalDateTime 代码:

tableEnvironment.registerDataStream("STOCK", messageStream, "name, price, rowtime.rowtime");

    Table result = tableEnvironment.sqlQuery(
            "SELECT name, MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avarage, COUNT(name) as sayi, TUMBLE_START(rowtime, INTERVAL '5' SECOND) AS zaman FROM STOCK GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), name");

    result.printSchema();

    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(kp.getProducerProperties().getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
            "STOCKGROUP", new SimpleStringSchema());

    myProducer.setWriteTimestampToKafka(true);

    DataStream<Tuple2<Boolean, StockGroup>> stream = tableEnvironment.toRetractStream(result, StockGroup.class);

    stream.map(x -> x.f1.toString()).addSink(myProducer);

stockgroup.pojo类;

public String name;
public Double minPrice;
public Double maxPrice;
public Double avarage;
public Long sayi;
public LocalDateTime zaman;

印刷体图式;

root
 |-- name: STRING
 |-- minPrice: DOUBLE
 |-- maxPrice: DOUBLE
 |-- avarage: DOUBLE
 |-- sayi: BIGINT NOT NULL
 |-- zaman: TIMESTAMP(3) *ROWTIME*

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题