用例:创建一个订单系统来处理移动设备和苹果设备的订单。
收到订单
一旦收到订单,就需要对通过和失败的结果进行3种不同的验证。
一旦所有的验证结果都执行了,我们需要汇总所有通过的订单,并且计数为3。满足上述验证条件的订单需要更新订单状态(从“已创建”到“已验证”),以便进一步处理。
主题:订单->保留所有订单的记录订单验证->保留主题订单中所有订单上发生的所有验证的记录。有3种类型的验证发生,因此每个订单在本主题中将有3条状态为“通过”或“失败”以及订单id的记录。
解决方案需要创建两个主题->
订单->接受所有订单
订单验证。->保留所有订单验证结果
注意:我在一个docker容器中使用ksqldb cli(6.0.0)。
ksql处理:
已经从两个主题s_o(来自ordres)和s_ov(订单验证)创建了2个流。
-- Create Stream of Orders topic as s_o
CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
-- Create Stream of Order-validations topic s_ot
CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');
将订单状态为“created”且窗口滚动时间为60秒的计数进行聚合,并存储在名为t\u s\u o的表中;
-- Create a table to aggregate count based on validation resuts and group by orderId
CREATE TABLE t_s_ov AS select ORDERVALIDATION->orderid as id, count(*) as total from s_ov window tumbling (size 60 seconds) where ORDERVALIDATION->ORDERVALIDATIONRESULT ='PASS' group by ORDERVALIDATION->orderid emit changes;
当计数为3或更多时(如果计数为3,则所有验证都通过),将来自s\u o和t\u s\u ov的订单合并,以将订单状态更新为“已验证”
因为ksql不允许持久表和非持久流之间的连接,所以我从主题“t\u s\u ov”创建了一个流(主题作为步骤2的一部分在内部创建)
CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
验证步骤4的结果
Details:
ksql> DROP STREAM IF EXISTS s_t_s_ov;
>CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
DROP STREAM IF EXISTS s_t_s_ov;
Message
------------------------------------------------
Source `S_T_S_OV` (topic: T_S_OV) was dropped.
------------------------------------------------
CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
Message
----------------
Stream created
----------------
ksql> select * from s_t_s_ov emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |TOTAL |
+--------------------------------------------------------+--------------------------------------------------------+
Press CTRL-C to interrupt
有人知道我做错了什么吗?或任何其他可能的方法来实现这一点。
订单流
ksql> describe extended s_o;
Name : S_O
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : orders (partitions: 1, replication: 1)
Statement : CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
;
Field | Type
------------------------------
ID | INTEGER
QUANTITY | INTEGER
PRICE | INTEGER
CUSTOMERID | INTEGER
ORDERSTATE | VARCHAR(STRING)
PRODUCT | VARCHAR(STRING)
------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 0.27 consumer-total-bytes: 6086118 consumer-total-messages: 34341 last-message: 2021-04-26T15:08:40.681Z
consumer-failed-messages: 5802 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-23T04:13:57.008Z
(Statistics of the local KSQL server interaction with the Kafka topic orders)
订单验证流
ksql> describe extended s_ov;
Name : S_OV
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : order-validations (partitions: 1, replication: 1)
Statement : CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');
Field | Type
---------------------------------------------------------------------------------------------------------------------
ORDERVALIDATION | STRUCT<ORDERVALIDATIONRESULT VARCHAR(STRING), ORDERVALIDATIONTYPE VARCHAR(STRING), ORDERID INTEGER>
---------------------------------------------------------------------------------------------------------------------
Queries that read from this STREAM
-----------------------------------
CTAS_T_S_OV_1_155 (RUNNING) : CREATE TABLE T_S_OV_1 WITH (KAFKA_TOPIC='T_S_OV_1', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
consumer-messages-per-sec: 1.09 consumer-total-bytes: 10836043 consumer-total-messages: 80796 last-message: 2021-04-27T01:39:21.178Z
consumer-failed-messages: 704 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-26T23:58:47.916Z
(Statistics of the local KSQL server interaction with the Kafka topic order-validations)
订单验证表聚合
ksql> describe extended t_s_ov;
Name : T_S_OV
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : T_S_OV (partitions: 1, replication: 1)
Statement : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT
S_OV.ORDERVALIDATION->ORDERID ORDERID,
COUNT(*) TOTAL
FROM S_OV S_OV
WINDOW TUMBLING ( SIZE 60 SECONDS )
WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS')
GROUP BY S_OV.ORDERVALIDATION->ORDERID
EMIT CHANGES;
Field | Type
------------------------------------------------------------------
ORDERID | INTEGER (primary key) (Window type: TUMBLING)
TOTAL | BIGINT
------------------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.09 total-messages: 7730 last-message: 2021-04-27T01:40:16.45Z
(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)
来自订单验证表的流
ksql> describe extended s_t_s_ov;
Name : S_T_S_OV
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : T_S_OV (partitions: 1, replication: 1)
Statement : CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
Field | Type
--------------------------------
ID | INTEGER (key)
TOTAL | INTEGER
--------------------------------
Local runtime statistics
------------------------
messages-per-sec: 0.09 total-messages: 7737 last-message: 2021-04-27T01:41:31.567Z
(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)
注意:消息总数:7737,但是它不显示任何结果。
我尝试了第3步的另一个版本,即orders id值为空的results。
ksql> CREATE STREAM s_t_s_ov1 (id integer, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
>
Message
----------------
Stream created
----------------
ksql> select * from s_t_s_ov1 emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |TOTAL |
+--------------------------------------------------------+--------------------------------------------------------+
|null |1 |
|null |1 |
|null |1 |
Limit Reached
Query terminated
注意,我已经从语句中删除了key。
暂无答案!
目前还没有任何答案,快来回答吧!