我一直试图使用kafka/ksqldb编写一个相当简单的用例,但运气不好。 我已经更新了这个例子来讨论一个更通用的用例。我所拥有的是 pagestats
在源数据库(本例中是mongodb)中更新给定页面的统计数据。数据包括:
页面名称
击打
状态
我的要求很简单。我需要的是一个按状态分组的所有页面的总和(命中)的聚合流,作为一个连续更新我的目标(在本例中是内存缓存)的流。
我有一个mongodb kafka连接器,它使用集合上的changestreams将来自mongodb的数据流传输到kafka主题中。
创建连接器:
curl -X PUT http://praveensrao:8083/connectors/pagestats001/config \
-H "Content-Type: application/json" -d '{
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only":"true",
"tasks.max":"1",
"change.stream.full.document":"default",
"collection":"pagestats001",
"key.converter.schemas.enable":"true",
"database":"test",
"name":"pagestats001",
"connection.uri":"mongodb://***:***@****:27017/test?replicaSet=****authSource=admin&ssl=true",
"value.converter.schemas.enable":"false",
"copy.existing":"false",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"errors.tolerance": "all",
"max.message.bytes": "16777216"
}'
我将文档插入mongodb集合 pagestats001
,我们有Kafka主题的信息 test.pagestats001
这种方式:
ksql> print "test.pagestats001" from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/16 03:33:29.780 Z, key: {"_id": {"_data": {"$binary": "gl+x8wgAAAADRjxfaWQAPDZkNmQ3YTFhLWU0NGQtNDI2Yy1iMzMzLTAwYjkzNDY0ZDQ0NAAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "6d6d7a1a-e44d-426c-b333-00b93464d444", "pagename": "page-1", "state": "ON", "hits": 5}
rowtime: 2020/11/16 03:34:43.297 Z, key: {"_id": {"_data": {"$binary": "gl+x81EAAAAIRjxfaWQAPGVhZGNjMjMxLTNlZmQtNGMzNy05ZGEzLWQ4MGY3ZTgzOGUzNwAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "eadcc231-3efd-4c37-9da3-d80f7e838e37", "pagename": "page-2", "state": "ON", "hits": 8}
rowtime: 2020/11/16 03:35:07.565 Z, key: {"_id": {"_data": {"$binary": "gl+x82UAAAAwRjxfaWQAPGJhNTE4YTQ5LWVhODEtNDY1Zi04ZWY3LWI3ZDZkMzk4ZmYyYwAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "ba518a49-ea81-465f-8ef7-b7d6d398ff2c", "pagename": "page-1", "state": "SK", "hits": 2}
rowtime: 2020/11/16 03:35:13.709 Z, key: {"_id": {"_data": {"$binary": "gl+x824AAAABRjxfaWQAPDgwOWZkZDNlLTkxMmQtNDcwNC05NmE0LTIyMWE5OWI0ZTkzZQAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "809fdd3e-912d-4704-96a4-221a99b4e93e", "pagename": "page-2", "state": "SK", "hits": 7}
由于密钥不可用于ksqldb表,因此基于一些联机文档执行此操作以根据值按id重新分区,而不是使用密钥:
CREATE STREAM test_pagestats001_schema (
_id VARCHAR,
pagename VARCHAR,
state VARCHAR,
hits BIGINT
) WITH (
KAFKA_TOPIC='test.pagestats001',
VALUE_FORMAT='JSON'
);
Message
----------------
Stream created
----------------
CREATE STREAM test_pagestats001_stream
WITH (KAFKA_TOPIC='test.pagestats001.byid') AS
SELECT
_id,
pagename,
state,
hits
FROM test_pagestats001_schema
PARTITION BY _id
EMIT CHANGES;
Message
--------------------------------------------------------
Created query with ID CSAS_TEST_PAGESTATS001_STREAM_77
--------------------------------------------------------
CREATE TABLE test_pagestats001_byid (
_id VARCHAR PRIMARY KEY,
pagename VARCHAR,
state VARCHAR,
hits BIGINT
) WITH (
KAFKA_TOPIC='test.pagestats001.byid',
VALUE_FORMAT='JSON'
);
Message
---------------
Table created
---------------
CREATE TABLE test_pagestats001_bystate AS
SELECT
state,
sum(hits) as hits
FROM test_pagestats001_byid
GROUP BY state
emit changes;
Message
---------------------------------------------------------
Created query with ID CTAS_TEST_PAGESTATS001_BYSTATE_81
---------------------------------------------------------
现在,我运行下面的
ksql> select * from test_pagestats001_bystate emit changes;
假设我在数据库中插入了一条记录,记录源主题中的一个州/省“ab” test.pagestats001
-我们有:
rowtime: 2020/11/16 04:07:48.290 Z, key: {"_id": {"_data": {"$binary": "gl+x+xAAAAABRjxfaWQAPDYxNzg0OGVlLTUxNDctNDczZS04MjU3LTE0NzA5ZDg2YzZjOQAAWhAEyYMCANg0R8K7k9TJ3Qfd3gQ=", "$type": "00"}}}, value: {"_id": "617848ee-5147-473e-8257-14709d86c6c9", "pagename": "page-1", "state": "AB", "hits": 2}
在table上 test_pagestats001_bystate
,我们有:
ksql> select * from test_pagestats001_bystate emit changes;
+-------------------------+-----------------------+
|STATE |HITS |
+-------------------------+-----------------------+
|AB |2 |
现在,如果我在数据库中为“page-2”插入一条与hits(1)状态相同的记录,将发出以下事件:
|AB |3 |
到现在为止还不错。 但现在问题来了。假设我更新了刚才作为hits(8)插入的同一条记录。然后我得到两个事件:
|AB |2 |
|AB |10 |
而不是表示(10)的聚合值的单个事件。换句话说,我并不期望事件的聚合值为(2)。 对我来说,这似乎是一个基本的用例。我是不是做错了什么?
ksql> version
Version: 6.0.0
暂无答案!
目前还没有任何答案,快来回答吧!