我尝试在Apache Flink中处理后从1 KDS写入到另一个KDS。我使用Zeppelin notebook创建了以下查询的sink表:
%flink.ssql
CREATE TABLE seller_revenue (
seller_id VARCHAR,
window_end TIMESTAMP,
sales DOUBLE
)
WITH (
'connector' = 'kinesis',
'stream' = 'seller_stream_window',
'aws.region' = 'ap-south-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json'
)
字符串
然后我写数据使用以下
%flink.ssql(parallelism=1)
INSERT INTO seller_revenue
SELECT
seller_id,
TUMBLE_END(proctime, INTERVAL '30' SECONDS) AS window_end,
SUM(product_quantity * product_price) AS sales
FROM seller_sales
GROUP BY
TUMBLE(proctime, INTERVAL '30' SECONDS),
seller_id
型
但出现以下错误:
Unable to create a sink for writing table 'hive.flink2.seller_revenue'.
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'kinesis'.
Unsupported options:
scan.stream.initpos
型
有人能帮忙解决吗?
我尝试删除不支持的选项scan.stream.initpos
,但没有数据被写入后,这一点。
1条答案
按热度按时间ibps3vxo1#
如果你deploy the Zeppelin notebook as a streaming application那么代码将工作。
在齐柏林笔记本电脑本身你不能执行这些步骤,我有一个类似的问题。