无法在ksql中创建流

hk8txs48  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(374)

我有如下流,我想从这个创建另一个流。我尝试下面的命令,我得到以下错误。我错过什么了吗?

  1. ksql> create stream down_devices_stream as select * from fakedata119 where deviceProperties['status']='false';
  2. Failed to generate code for SqlPredicate.filterExpression: (FAKEDATA119.DEVICEPROPERTIES['status'] = 'false')schema:org.apache.kafka.connect.data.SchemaBuilder@6e18dbbfisWindowedKey:false
  3. Caused by: Line 1, Column 180: Operator "<=" not allowed on reference operands
  4. ksql> select * from fakedata119;
  5. 1529505497087 | null | 19 | visibility sensors | Wed Jun 20 16:38:17 CEST 2018 | {visibility=74, status=true}
  6. 1529505498087 | null | 7 | fans | Wed Jun 20 16:38:18 CEST 2018 | {temperature=44, rotationSense=1, status=false, frequency=49}
  7. 1529505499088 | null | 28 | air quality monitors | Wed Jun 20 16:38:19 CEST 2018 | {coPpm=257, status=false, Co2Ppm=134}
  8. 1529505500089 | null | 4 | fans | Wed Jun 20 16:38:20 CEST 2018 | {temperature=42, rotationSense=1, status=true, frequency=51}
  9. 1529505501089 | null | 23 | air quality monitors | Wed Jun 20 16:38:21 CEST 2018 | {coPpm=158, status=true, Co2Ppm=215}
  10. sql> describe fakedata119;
  11. Field | Type
  12. ---------------------------------------------------------
  13. ROWTIME | BIGINT (system)
  14. ROWKEY | VARCHAR(STRING) (system)
  15. DEVICEID | INTEGER
  16. CATEGORY | VARCHAR(STRING)
  17. TIMESTAMP | VARCHAR(STRING)
  18. DEVICEPROPERTIES | MAP[VARCHAR(STRING),VARCHAR(STRING)]
af7jpaap

af7jpaap1#

我添加了一个测试来覆盖这个用例:
https://github.com/confluentinc/ksql/pull/1476/files
有趣的是,这将传递给我们的主分支和即将到来的5.0分支,但在4.1中失败。
所以。。。看起来这是您正在使用的版本中的一个问题,但好消息是它在即将发布的版本中得到了修复。另外,你现在可以利用罗宾的工作。
快乐查询!
安迪

eeq64g8w

eeq64g8w2#

在没有看到您的输入数据的情况下,我猜它看起来是这样的:

  1. {
  2. "id": "a42",
  3. "category": "foo",
  4. "timestamp": "2018-06-21 10:04:57 BST",
  5. "deviceID": 42,
  6. "deviceProperties": {
  7. "status": "false",
  8. "foo": "bar"
  9. }
  10. }

如果是这样,你最好用 EXTRACTJSONFIELD 访问嵌套值,并生成 predicate 。

  1. CREATE STREAM test (Id VARCHAR, category VARCHAR, timeStamp VARCHAR, \
  2. deviceID INTEGER, deviceProperties VARCHAR) \
  3. WITH (KAFKA_TOPIC='test_map2', VALUE_FORMAT='JSON');
  4. ksql> SELECT EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status') AS STATUS FROM fakeData223;
  5. false
  6. ksql> SELECT * FROM fakeData223 \
  7. WHERE EXTRACTJSONFIELD(DEVICEPROPERTIES,'$.status')='false';
  8. 1529572405759 | null | a42 | foo | 2018-06-21 10:04:57 BST | 42 | {"status":"false","foo":"bar"}

您发现的错误我已记录为要在此处跟踪的错误:https://github.com/confluentinc/ksql/issues/1474

展开查看全部

相关问题