用例
目标是识别传入的事件/行,以检查它是新行还是更新。新行将转到其他主题,更新行将转到其他主题。
方法:使用一个查找表(ktable)并执行两个连接操作1。内部连接以检测更新。2left join当右表键为null时检测插入/新行。从上述两个操作的结果创建两个流。对流运行insert-into查询,这将把记录插入到查找表中。
复制问题的步骤如下:(需要7分钟)
第一步。 docker-compose up
最新的汇合5.1.0平台与docker。
第二步。 docker ps
注意:确保经纪人是上升的。经纪人经常在我的地方下跌。
第三步。
在一个新的终端中进入模式注册表的bash(如果您保持这个终端打开,那么很容易进行监视)
docker run -it --net=cp-all-in-one_default --rm confluentinc/cp-schema-registry:5.1.0 bash
第四步。
创建查找表。主题为load.test.local.lookup.table。我的架构具有字符串类型的键。下面是三个样本记录。首先用3个伪记录填充查找表。
kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.LOOKUP.TABLE \
--property schema.registry.url=http://schema-registry:8081 \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name":"LOAD.TEST.LOCAL.LOOKUP.TABLE","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'
现在您可以在下面插入记录。只需粘贴下面的3条记录。如果您多次按return键并得到一个异常,只需再次运行上述相同的命令并在按之后插入这些命令 return
一次。
"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
按 ⌘+c
退出。
第五步。在其他终端上,打开ksql cli
docker run --network cp-all-in-one_default --interactive --tty --rm confluentinc/cp-ksql-cli:latest http://ksql-server:8088
第六步。创建一个ktable。
create table load_test_local_lookup_table with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
第七步。请确保设置以下属性,以便可以从起始偏移量看到结果。在ksql中运行这个。
ksql> SET 'auto.offset.reset'='earliest';
您将看到以下消息。 Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
第8步。现在创建一个主题,您的事件将流式处理。使用步骤4的模式注册表的bash。另外,将相同的记录填充到基础主题中。
kafka-avro-console-producer --broker-list broker:9092 --topic LOAD.TEST.LOCAL.EVENT.STREAM \
--property schema.registry.url=http://schema-registry:8081 \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"name":"LOAD.TEST.LOCAL.EVENT.STREAM","type":"record","namespace":"example.sender.batch","fields":[{"name":"SENDER_CODE","type":"string"},{"name":"SENDER_NAME","type":"string"},{"name":"SENDER_CATEGORY_CODE","type":"string"},{"name":"SENDER_AGENCY_CODE","type":"string"},{"name":"SENDER_SUB_AGENCY_CODE","type":"string"},{"name":"SENDER_FOREIGN_IND","type":"string"},{"name":"SENDER_FOREIGN_COUNTRY","type":"string"},{"name":"SENDER_NAME_ALTERNATE","type":"string"},{"name":"PARENT_SENDER_CODE","type":"string"},{"name":"CHANGE_DATE","type":"string"},{"name":"REQUESTING_LOCATION","type":"string"},{"name":"REQUEST_DATE","type":"string"},{"name":"REPLACEMENT_SENDER_CODE","type":"string"},{"name":"SENDER_STATUS","type":"string"},{"name":"SENDER_DUNS","type":"string"},{"name":"ADDRESSLINE1","type":"string"},{"name":"ADDRESSLINE2","type":"string"},{"name":"ADDRESSLINE3","type":"string"},{"name":"ADDRESS4","type":"string"},{"name":"CITY","type":"string"},{"name":"STATE","type":"string"},{"name":"POSTAL_CODE","type":"string"},{"name":"URL","type":"string"},{"name":"SENDER_ACRONYM","type":"string"},{"name":"DEACTIVATED_DATE","type":"string"},{"name":"Kafka_TimeEvent","type":"string"}]}'
"SVI6FQ",{"SENDER_CODE":"SVI6FQ","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FR",{"SENDER_CODE":"SVI6FR","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"374 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 973","ADDRESSLINE3":"MAILBOXC","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
"SVI6FN",{"SENDER_CODE":"SVI6FN","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"372 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXA","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
第9步。
为此事件主题创建流。
create stream load_test_local_event_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.EVENT.STREAM',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
第10步。
在流表联接之后派生一个流,将用于检测已存在的行。我们称之为更新流。为下面的流创建的主题将只有更新。这是我的一个用例。我必须过滤的消息是更新。
create stream load_test_update_stream as select event.* FROM load_test_local_event_stream event JOIN load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code;
第11步。
使用用于查找表的主题创建流。因此,如果要更新查找表,可以将其插入到该流中(如果我没有错的话:您不能直接从流插入到ktable。)。这么做。
create stream load_test_lookup_feed_stream with (KAFKA_TOPIC='LOAD.TEST.LOCAL.LOOKUP.TABLE',VALUE_FORMAT='AVRO',KEY='SENDER_CODE');
第12步。运行insert into查询。此查询将插入到查找表提要流中,当更新流中有消息可用时,该提要流将更新查找表。
Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4 , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_update_stream partition by SENDER_CODE ;
:感叹号:问题:虽然这更新了我的查找表,但它正在作为新记录更新。不是作为更新。要复制此问题(请遵循步骤15a)。
第13步。与update非常相似,创建一个流来检测事件中的新记录。
create stream load_test_insert_stream as select event.* FROM load_test_local_event_stream event left JOIN load_test_local_lookup_table lookup ON event.sender_code = lookup.sender_Code where lookup.sender_Code is null ;
验证:您可以选择只运行select查询来了解发生了什么。如果您的模式注册表bash已打开,请用您自己的键插入一条新记录尝试插入一条新记录(如第15a节)。此新消息将在此流中可用。
第14步。像以前一样在查询中创建一个insert。这是插入回查找表。现在,查找表中填充了一条新消息。
Insert into load_test_lookup_feed_stream select EVENT_SENDER_CODE AS SENDER_CODE, EVENT_SENDER_NAME AS SENDER_NAME, EVENT_SENDER_CATEGORY_CODE AS SENDER_CATEGORY_CODE , EVENT_SENDER_AGENCY_CODE AS SENDER_AGENCY_CODE , EVENT_SENDER_SUB_AGENCY_CODE AS SENDER_SUB_AGENCY_CODE, EVENT_SENDER_FOREIGN_IND AS SENDER_FOREIGN_IND, EVENT_SENDER_FOREIGN_COUNTRY AS SENDER_FOREIGN_COUNTRY , EVENT_SENDER_NAME_ALTERNATE AS SENDER_NAME_ALTERNATE, EVENT_PARENT_SENDER_CODE AS PARENT_SENDER_CODE ,EVENT_CHANGE_DATE AS CHANGE_DATE, EVENT_REQUESTING_LOCATION AS REQUESTING_LOCATION , EVENT_REQUEST_DATE AS REQUEST_DATE, EVENT_REPLACEMENT_SENDER_CODE AS REPLACEMENT_SENDER_CODE , EVENT_SENDER_STATUS AS SENDER_STATUS, EVENT_SENDER_DUNS AS SENDER_DUNS , EVENT_ADDRESSLINE1 AS ADDRESSLINE1 , EVENT_ADDRESSLINE2 AS ADDRESSLINE2, EVENT_ADDRESSLINE3 AS ADDRESSLINE3 , EVENT_ADDRESS4 AS ADDRESS4 , EVENT_CITY AS CITY , EVENT_STATE AS STATE, EVENT_POSTAL_CODE AS POSTAL_CODE, EVENT_URL AS URL, EVENT_SENDER_ACRONYM AS SENDER_ACRONYM , EVENT_DEACTIVATED_DATE AS DEACTIVATED_DATE, EVENT_KAFKA_TIMEEVENT AS KAFKA_TIMEEVENT from load_test_insert_stream partition by SENDER_CODE ;
第15步。
问题是:如何复制。
步骤15a。如何插入新的样本记录
运行步骤8中的命令(使用模式)。按以下方式插入/粘贴新记录。注意,我已经更改了消息密钥和发送方代码。消息键和行键应始终匹配。例如: "SVI6FW","SENDER_CODE":"SVI6FW
```
"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM II","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
第15b步。如何更新样本记录
与之前的(15a)新记录插入非常相似,但使用相同的消息键,只更新名称或某个值。例如, `'SAM II'` 成为 `'SAM III'` ```
"SVI6FW",{"SENDER_CODE":"SVI6FW","SENDER_NAME":"SENDER SAM III","SENDER_CATEGORY_CODE":"5","SENDER_AGENCY_CODE":"","SENDER_SUB_AGENCY_CODE":"","SENDER_FOREIGN_IND":"","SENDER_FOREIGN_COUNTRY":"","SENDER_NAME_ALTERNATE":"","PARENT_SENDER_CODE":"5","CHANGE_DATE":"2018-09-27","REQUESTING_LOCATION":"","REQUEST_DATE":"","REPLACEMENT_SENDER_CODE":"","SENDER_STATUS":"","SENDER_DUNS":"","ADDRESSLINE1":"373 ELAN VILLAGE LANE","ADDRESSLINE2":"APPARTMENT 972","ADDRESSLINE3":"MAILBOXB","ADDRESS4":"","CITY":"SAN JOSE","STATE":"CA","POSTAL_CODE":"95134","URL":"","SENDER_ACRONYM":"","DEACTIVATED_DATE":"","Kafka_TimeEvent":"2018-09-27"}
问题如果您看到,我的查找表没有得到更新,它将每个消息视为一个新消息,即使它使用相同的密钥发送。因此,我无法检测更新。每一条信息都是新的信息。
您可以通过执行以下操作进行测试。
使用自己的密钥(15a)发送新消息。它将在load\u test\u insert\u流中可用。
发送一个更新的消息,使用相同的密钥,如15b。它应该在load\u test\u update\u流中可用,但它将加载\u test\u insert\u流。查找表将其视为新消息。
欢迎任何新的方法/建议!
1条答案
按热度按时间k7fdbhmy1#
我假设你的设计基本上接近:
然后插入一些记录:
并期望前两行在
INSERTS
流中的最后一行OUTPUTS
行。我已经在ksqldb的0.11版本上测试了上述内容,它确实可以工作。。。。是的。
如果从cli中逐个插入每条记录,则输出与预期一样。但是,如果同时插入所有三行(例如,通过在cli的同一行上运行它们):
然后所有三排都在
INSERTS
溪流。为什么?你可以问。热释光;博士;溶液是易碎的。如果更新接近插入,它将不起作用。
设计中有竞赛条件。如果联接在第一行输出产生到前处理第二个输入行
INSERTS
主题和联接已轮询以读取此行,然后EXISTING
表将不包括该行,因此第二行将被错误地发送到INSERTS
而不是更新。有一些配置,你可以玩,看看你是否可以让这个工作为您的用例。
设置
max.task.idle.ms
更高的值意味着连接等待数据出现在连接的表侧的时间更长。但是,如果更新和插入发生在同一毫秒内,这将毫无帮助,增加它将损害吞吐量和延迟。设置
cache.max.bytes.buffering
归零将关闭流库中的缓冲,这可能会有所帮助。设置
linger.ms
归零意味着Kafka制作人不会延迟发送消息。即使有了所有这些,系统也是异步的,您的结果可能会有所不同。如果更新从来没有发生在靠近insert的地方,那么系统就可以工作了。但是,如果更新可以接近插入,您可能会发现这些被错误地归类为插入。