如何在ksql中选择/分配分区

af7jpaap  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(276)

我认为这是一个相关的问题:使用kafka ksql从特定分区中选择一个主题的所有事件,并使用给定的偏移量
如何通过ksql选择/分配分区?我试图阻止ksql从所有分区读取数据,因为必要的数据只存在于一个shard中。
例如:
cli v5.4.1,服务器v5.4.1

SET 'auto.offset.reset'='earliest';
CREATE STREAM SOURCE_STREAM (FIELD_1 BIGINT)
    WITH (
        VALUE_FORMAT='AVRO',
        KAFKA_TOPIC='source_topic',
        PARTITIONS=2,
        REPLICAS=1
    );

插入一些存在于分区0和分区1中的模拟数据(不是真正分配的,例如)

INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (123);  # say in partition 0
INSERT INTO SOURCE_STREAM (FIELD_1) VALUES (456);  # say in partition 1

使用使用者api可以执行以下操作:

consumer.assign(TopicPartition(topic=source_topic, partition=0))
consumer.assign(TopicPartition(topic=source_topic, partition=1))
consumer.get()

但是,对于当前的api,我不确定如何在客户机级别或服务器属性级别“分配”分区。下面的派生流将从所有分区读取:

CREATE STREAM DERIVATIVE_STREAM AS 
    SELECT
        FIELD_1
    FROM SOURCE_STREAM
    EMIT CHANGES;

EXPLAIN CSAS_DERIVATIVE_STREAM_n;

)我知道我可以用 WHERE 语句来筛选数据,但我想显式读取分区0(1)

46scxncf

46scxncf1#

ksqldb不是这样工作的。您可以使用sql来声明所需的内容,而不是声明所需的方式。
正如你在问题中所说的,你可以使用 WHERE 将 predicate 应用于查询,并可以使用 ROWKEY 以消息键值为目标。
我猜rdbms世界中的类似情况对基于成本的优化者来说是一个执行计划的提示。
如果您想将此作为增强请求记录到ksqldb,请在此处执行:https://github.com/confluentinc/ksql/issues/new

相关问题