ksqldb-stream不使用主题键生成任何结果

b09cbbtk  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(213)

用例:创建一个订单系统来处理移动设备和苹果设备的订单。
收到订单
一旦收到订单,就需要对通过和失败的结果进行3种不同的验证。
一旦所有的验证结果都执行了,我们需要汇总所有通过的订单,并且计数为3。满足上述验证条件的订单需要更新订单状态(从“已创建”到“已验证”),以便进一步处理。
主题:订单->保留所有订单的记录订单验证->保留主题订单中所有订单上发生的所有验证的记录。有3种类型的验证发生,因此每个订单在本主题中将有3条状态为“通过”或“失败”以及订单id的记录。
解决方案需要创建两个主题->
订单->接受所有订单
订单验证。->保留所有订单验证结果
注意:我在一个docker容器中使用ksqldb cli(6.0.0)。
ksql处理:
已经从两个主题s_o(来自ordres)和s_ov(订单验证)创建了2个流。

  1. -- Create Stream of Orders topic as s_o
  2. CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
  3. -- Create Stream of Order-validations topic s_ot
  4. CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');

将订单状态为“created”且窗口滚动时间为60秒的计数进行聚合,并存储在名为t\u s\u o的表中;

  1. -- Create a table to aggregate count based on validation resuts and group by orderId
  2. CREATE TABLE t_s_ov AS select ORDERVALIDATION->orderid as id, count(*) as total from s_ov window tumbling (size 60 seconds) where ORDERVALIDATION->ORDERVALIDATIONRESULT ='PASS' group by ORDERVALIDATION->orderid emit changes;

当计数为3或更多时(如果计数为3,则所有验证都通过),将来自s\u o和t\u s\u ov的订单合并,以将订单状态更新为“已验证”
因为ksql不允许持久表和非持久流之间的连接,所以我从主题“t\u s\u ov”创建了一个流(主题作为步骤2的一部分在内部创建)

  1. CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

验证步骤4的结果

  1. Details:
  2. ksql> DROP STREAM IF EXISTS s_t_s_ov;
  3. >CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
  4. DROP STREAM IF EXISTS s_t_s_ov;
  5. Message
  6. ------------------------------------------------
  7. Source `S_T_S_OV` (topic: T_S_OV) was dropped.
  8. ------------------------------------------------
  9. CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
  10. Message
  11. ----------------
  12. Stream created
  13. ----------------
  14. ksql> select * from s_t_s_ov emit changes limit 3;
  15. +--------------------------------------------------------+--------------------------------------------------------+
  16. |ID |TOTAL |
  17. +--------------------------------------------------------+--------------------------------------------------------+
  18. Press CTRL-C to interrupt

有人知道我做错了什么吗?或任何其他可能的方法来实现这一点。
订单流

  1. ksql> describe extended s_o;
  2. Name : S_O
  3. Type : STREAM
  4. Timestamp field : Not set - using <ROWTIME>
  5. Key format : KAFKA
  6. Value format : JSON
  7. Kafka topic : orders (partitions: 1, replication: 1)
  8. Statement : CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
  9. ;
  10. Field | Type
  11. ------------------------------
  12. ID | INTEGER
  13. QUANTITY | INTEGER
  14. PRICE | INTEGER
  15. CUSTOMERID | INTEGER
  16. ORDERSTATE | VARCHAR(STRING)
  17. PRODUCT | VARCHAR(STRING)
  18. ------------------------------
  19. Local runtime statistics
  20. ------------------------
  21. consumer-messages-per-sec: 0.27 consumer-total-bytes: 6086118 consumer-total-messages: 34341 last-message: 2021-04-26T15:08:40.681Z
  22. consumer-failed-messages: 5802 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-23T04:13:57.008Z
  23. (Statistics of the local KSQL server interaction with the Kafka topic orders)

订单验证流

  1. ksql> describe extended s_ov;
  2. Name : S_OV
  3. Type : STREAM
  4. Timestamp field : Not set - using <ROWTIME>
  5. Key format : KAFKA
  6. Value format : JSON
  7. Kafka topic : order-validations (partitions: 1, replication: 1)
  8. Statement : CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');
  9. Field | Type
  10. ---------------------------------------------------------------------------------------------------------------------
  11. ORDERVALIDATION | STRUCT<ORDERVALIDATIONRESULT VARCHAR(STRING), ORDERVALIDATIONTYPE VARCHAR(STRING), ORDERID INTEGER>
  12. ---------------------------------------------------------------------------------------------------------------------
  13. Queries that read from this STREAM
  14. -----------------------------------
  15. CTAS_T_S_OV_1_155 (RUNNING) : CREATE TABLE T_S_OV_1 WITH (KAFKA_TOPIC='T_S_OV_1', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
  16. CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
  17. For query topology and execution plan please run: EXPLAIN <QueryId>
  18. Local runtime statistics
  19. ------------------------
  20. consumer-messages-per-sec: 1.09 consumer-total-bytes: 10836043 consumer-total-messages: 80796 last-message: 2021-04-27T01:39:21.178Z
  21. consumer-failed-messages: 704 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-26T23:58:47.916Z
  22. (Statistics of the local KSQL server interaction with the Kafka topic order-validations)

订单验证表聚合

  1. ksql> describe extended t_s_ov;
  2. Name : T_S_OV
  3. Type : TABLE
  4. Timestamp field : Not set - using <ROWTIME>
  5. Key format : KAFKA
  6. Value format : JSON
  7. Kafka topic : T_S_OV (partitions: 1, replication: 1)
  8. Statement : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT
  9. S_OV.ORDERVALIDATION->ORDERID ORDERID,
  10. COUNT(*) TOTAL
  11. FROM S_OV S_OV
  12. WINDOW TUMBLING ( SIZE 60 SECONDS )
  13. WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS')
  14. GROUP BY S_OV.ORDERVALIDATION->ORDERID
  15. EMIT CHANGES;
  16. Field | Type
  17. ------------------------------------------------------------------
  18. ORDERID | INTEGER (primary key) (Window type: TUMBLING)
  19. TOTAL | BIGINT
  20. ------------------------------------------------------------------
  21. Queries that write from this TABLE
  22. -----------------------------------
  23. CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
  24. For query topology and execution plan please run: EXPLAIN <QueryId>
  25. Local runtime statistics
  26. ------------------------
  27. messages-per-sec: 0.09 total-messages: 7730 last-message: 2021-04-27T01:40:16.45Z
  28. (Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

来自订单验证表的流

  1. ksql> describe extended s_t_s_ov;
  2. Name : S_T_S_OV
  3. Type : STREAM
  4. Timestamp field : Not set - using <ROWTIME>
  5. Key format : KAFKA
  6. Value format : JSON
  7. Kafka topic : T_S_OV (partitions: 1, replication: 1)
  8. Statement : CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
  9. Field | Type
  10. --------------------------------
  11. ID | INTEGER (key)
  12. TOTAL | INTEGER
  13. --------------------------------
  14. Local runtime statistics
  15. ------------------------
  16. messages-per-sec: 0.09 total-messages: 7737 last-message: 2021-04-27T01:41:31.567Z
  17. (Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

注意:消息总数:7737,但是它不显示任何结果。
我尝试了第3步的另一个版本,即orders id值为空的results。

  1. ksql> CREATE STREAM s_t_s_ov1 (id integer, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
  2. >
  3. Message
  4. ----------------
  5. Stream created
  6. ----------------
  7. ksql> select * from s_t_s_ov1 emit changes limit 3;
  8. +--------------------------------------------------------+--------------------------------------------------------+
  9. |ID |TOTAL |
  10. +--------------------------------------------------------+--------------------------------------------------------+
  11. |null |1 |
  12. |null |1 |
  13. |null |1 |
  14. Limit Reached
  15. Query terminated

注意,我已经从语句中删除了key。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题