我认为这是一个相关的问题:使用kafka ksql从特定分区中选择一个主题的所有事件,并使用给定的偏移量
如何通过ksql选择/分配分区?我试图阻止ksql从所有分区读取数据,因为必要的数据只存在于一个shard中。
例如:
cli v5.4.1,服务器v5.4.1
SET 'auto.offset.reset'='earliest';
CREATE STREAM SOURCE_STREAM (FIELD_1 BIGINT)
WITH (
VALUE_FORMAT='AVRO',
KAFKA_TOPIC='source_topic',
PARTITIONS=2,
REPLICAS=1
);
插入一些存在于分区0和分区1中的模拟数据(不是真正分配的,例如)
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (123); # say in partition 0
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (456); # say in partition 1
使用使用者api可以执行以下操作:
consumer.assign(TopicPartition(topic=source_topic, partition=0))
consumer.assign(TopicPartition(topic=source_topic, partition=1))
consumer.get()
但是,对于当前的api,我不确定如何在客户机级别或服务器属性级别“分配”分区。下面的派生流将从所有分区读取:
CREATE STREAM DERIVATIVE_STREAM AS
SELECT
FIELD_1
FROM SOURCE_STREAM
EMIT CHANGES;
EXPLAIN CSAS_DERIVATIVE_STREAM_n;
)我知道我可以用 WHERE
语句来筛选数据,但我想显式读取分区0(1)
1条答案
按热度按时间46scxncf1#
ksqldb不是这样工作的。您可以使用sql来声明所需的内容,而不是声明所需的方式。
正如你在问题中所说的,你可以使用
WHERE
将 predicate 应用于查询,并可以使用ROWKEY
以消息键值为目标。我猜rdbms世界中的类似情况对基于成本的优化者来说是一个执行计划的提示。
如果您想将此作为增强请求记录到ksqldb,请在此处执行:https://github.com/confluentinc/ksql/issues/new