ksqldb-stream不使用主题键生成任何结果

b09cbbtk  于 2021-07-15  发布在  Kafka
关注(0)|答案(0)|浏览(188)

用例:创建一个订单系统来处理移动设备和苹果设备的订单。
收到订单
一旦收到订单,就需要对通过和失败的结果进行3种不同的验证。
一旦所有的验证结果都执行了,我们需要汇总所有通过的订单,并且计数为3。满足上述验证条件的订单需要更新订单状态(从“已创建”到“已验证”),以便进一步处理。
主题:订单->保留所有订单的记录订单验证->保留主题订单中所有订单上发生的所有验证的记录。有3种类型的验证发生,因此每个订单在本主题中将有3条状态为“通过”或“失败”以及订单id的记录。
解决方案需要创建两个主题->
订单->接受所有订单
订单验证。->保留所有订单验证结果
注意:我在一个docker容器中使用ksqldb cli(6.0.0)。
ksql处理:
已经从两个主题s_o(来自ordres)和s_ov(订单验证)创建了2个流。

-- Create Stream of Orders topic as s_o
CREATE stream s_o  (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar )  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
-- Create Stream of Order-validations topic  s_ot

CREATE STREAM s_ov  (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> )  WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');

将订单状态为“created”且窗口滚动时间为60秒的计数进行聚合,并存储在名为t\u s\u o的表中;

-- Create a table to aggregate count based on validation resuts and group by orderId
CREATE TABLE t_s_ov  AS select ORDERVALIDATION->orderid as id,  count(*) as total  from s_ov  window tumbling (size 60 seconds)   where ORDERVALIDATION->ORDERVALIDATIONRESULT ='PASS'   group by ORDERVALIDATION->orderid emit changes;

当计数为3或更多时(如果计数为3,则所有验证都通过),将来自s\u o和t\u s\u ov的订单合并,以将订单状态更新为“已验证”
因为ksql不允许持久表和非持久流之间的连接,所以我从主题“t\u s\u ov”创建了一个流(主题作为步骤2的一部分在内部创建)

CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

验证步骤4的结果

Details:

ksql> DROP STREAM IF EXISTS s_t_s_ov;
>CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

DROP STREAM IF EXISTS s_t_s_ov;
 Message
------------------------------------------------
 Source `S_T_S_OV` (topic: T_S_OV) was dropped.
------------------------------------------------

CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
 Message
----------------
 Stream created
----------------
ksql> select * from s_t_s_ov emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |TOTAL                                                   |
+--------------------------------------------------------+--------------------------------------------------------+

Press CTRL-C to interrupt

有人知道我做错了什么吗?或任何其他可能的方法来实现这一点。
订单流

ksql> describe extended s_o;

Name                 : S_O
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : orders (partitions: 1, replication: 1)
Statement            : CREATE stream s_o  (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar )  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
;

 Field      | Type
------------------------------
 ID         | INTEGER
 QUANTITY   | INTEGER
 PRICE      | INTEGER
 CUSTOMERID | INTEGER
 ORDERSTATE | VARCHAR(STRING)
 PRODUCT    | VARCHAR(STRING)
------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:      0.27 consumer-total-bytes:   6086118 consumer-total-messages:     34341     last-message: 2021-04-26T15:08:40.681Z
consumer-failed-messages:      5802 consumer-failed-messages-per-sec:         0      last-failed: 2021-04-23T04:13:57.008Z
(Statistics of the local KSQL server interaction with the Kafka topic orders)

订单验证流

ksql> describe extended s_ov;

Name                 : S_OV
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : order-validations (partitions: 1, replication: 1)
Statement            : CREATE STREAM s_ov  (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> )  WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');

 Field           | Type                                                                                              
---------------------------------------------------------------------------------------------------------------------
 ORDERVALIDATION | STRUCT<ORDERVALIDATIONRESULT VARCHAR(STRING), ORDERVALIDATIONTYPE VARCHAR(STRING), ORDERID INTEGER>
---------------------------------------------------------------------------------------------------------------------

Queries that read from this STREAM
-----------------------------------
CTAS_T_S_OV_1_155 (RUNNING) : CREATE TABLE T_S_OV_1 WITH (KAFKA_TOPIC='T_S_OV_1', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ORDERID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
consumer-messages-per-sec:      1.09 consumer-total-bytes:  10836043 consumer-total-messages:     80796     last-message: 2021-04-27T01:39:21.178Z
consumer-failed-messages:       704 consumer-failed-messages-per-sec:         0      last-failed: 2021-04-26T23:58:47.916Z
(Statistics of the local KSQL server interaction with the Kafka topic order-validations)

订单验证表聚合

ksql> describe extended t_s_ov;

Name                 : T_S_OV
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : T_S_OV (partitions: 1, replication: 1)
Statement            : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT
  S_OV.ORDERVALIDATION->ORDERID ORDERID,
  COUNT(*) TOTAL
FROM S_OV S_OV
WINDOW TUMBLING ( SIZE 60 SECONDS )
WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS')
GROUP BY S_OV.ORDERVALIDATION->ORDERID
EMIT CHANGES;

 Field   | Type
------------------------------------------------------------------
 ORDERID | INTEGER          (primary key) (Window type: TUMBLING)
 TOTAL   | BIGINT
------------------------------------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ORDERID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.09   total-messages:      7730     last-message: 2021-04-27T01:40:16.45Z

(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

来自订单验证表的流

ksql> describe extended s_t_s_ov;

Name                 : S_T_S_OV
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : T_S_OV (partitions: 1, replication: 1)
Statement            : CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

 Field | Type
--------------------------------
 ID    | INTEGER          (key)
 TOTAL | INTEGER
--------------------------------

Local runtime statistics
------------------------
messages-per-sec:      0.09   total-messages:      7737     last-message: 2021-04-27T01:41:31.567Z

(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

注意:消息总数:7737,但是它不显示任何结果。
我尝试了第3步的另一个版本,即orders id值为空的results。

ksql> CREATE STREAM s_t_s_ov1  (id integer, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
>

 Message
----------------
 Stream created
----------------
ksql> select * from s_t_s_ov1 emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |TOTAL                                                   |
+--------------------------------------------------------+--------------------------------------------------------+
|null                                                    |1                                                       |
|null                                                    |1                                                       |
|null                                                    |1                                                       |
Limit Reached
Query terminated

注意,我已经从语句中删除了key。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题