无法使用ksql创建流

jljoyd4f  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(405)

我正在尝试使用ksql创建一个流。

ksql> CREATE STREAM fakeData22 (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, deviceID INTEGER, properties MAP<VARCHAR, VARCHAR>) WITH (KAFKA_TOPIC='fake-data-19', VALUE_FORMAT='JSON');

我得到以下输出。我错过什么了吗?

line 1:94: extraneous input 'properties' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: line 1:94: extraneous input 'properties' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
cgfeq70w

cgfeq70w1#

因为ksql不支持backtick转义。如果表列声明为反勾号转义且为大写,则实际的解决方法是:

CREATE STREAM fakeData22 \
    (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, deviceID INTEGER, \
    `PROPERTIES` MAP<VARCHAR, VARCHAR>) \
WITH (KAFKA_TOPIC='fake-data-19', \
    VALUE_FORMAT='JSON');

如下图所示。

ksql> select `PROPERTIES` from fakeData22;

相关的ksqlgithub问题:支持转义标识符名称#677

vatpfxk5

vatpfxk52#

我想 properties 必须是ksql中的保留字。我已经将此添加到ksql项目的一个问题中,供我们跟踪,但在此期间请尝试附上 properties 在backticks中:

CREATE STREAM fakeData22 \
      (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, deviceID INTEGER, \
      `properties` MAP<VARCHAR, VARCHAR>) \
WITH (KAFKA_TOPIC='fake-data-19', \
      VALUE_FORMAT='JSON');

相关问题