CREATE TABLE user_log (
a STRING,
b STRING
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'test',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true',
'format.fail-on-missing-field' = 'false'
);
正确的格式是{“a”:1,“b”:2},但Kafka发送了一个错误的数据:aabb,程序将停止。如何过滤掉sql中格式错误的json
2条答案
按热度按时间nxagd54h1#
定义表的sc配置时,可以设置如下内容:
你想怎么做就怎么做。
ktca8awb2#
在flink1.11(即将发布)中,这些格式选项已经被添加(并且都默认为false)。见flink-17663。
我不确定在早期版本中你打算做什么。