kstream globalktable join能否为特定搜索返回多个匹配记录?

odopli94  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我希望有人能帮我解决一个关于Kafkaglobalktables的问题。
我正在尝试执行kstream globalktable联接。但是,我希望检索globalktable中的所有条目,这些条目的键或值包含在原始流事件中找到的字符串。例如,假设我的表有3行,其中包含以下键:

Key: BANK055DEPOSIT value: {some data}
 Key: BANK055CREDIT value: {different data}
 Key: BANK033CREDIT value: {more different data}

当我对表执行连接以检索数据时,我需要收回其键或值包含“055”的所有行。所以我想要前两排。
在数据库世界中,这相当于以下内容:

SELECT * FROM GlobalKTable where table_key.contains("055”) OR table_value.contains(“055”)

我浏览了官方文件,没有找到如何做到这一点的例子。我怀疑从globalktable联接中检索n个行是不可能的。
另外,我使用streams dsl来实现这个。不确定使用处理器api是否可以实现这一点。任何意见都将不胜感激!

cpjpxq1n

cpjpxq1n1#

当加入 KStream 用一个 GlobalKTable ,可以使用部分键和值 KStream, 但最终必须与整个 GlobalKTable 所以,不幸的是,你不能像上面所说的那样使用join。
但是,即使使用dsl,您也应该能够做到这一点。如果你用过 KStream.transformValues 用一个 ValueTransformerWithKeySupplier ,您可以扫描statestore并根据流记录中包含的子字符串提取所需的记录。此外,您不一定需要扫描整个存储,而是使用范围查询。
编辑:这里有一些代码,我得到的工作,以证明我得到了什么。

@SuppressWarnings("unchecked")
public class MultiResultJoinExample {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final StreamsBuilder builder = new StreamsBuilder();

        final String storeName = "kv-store";
        final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
                        Serdes.String(),
                        Serdes.String());
        builder.addStateStore(keyValueStoreBuilder);

        final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));

        streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);

        final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));

        streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
        streams.start();
    }

    static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
        final String storeName;

        public FlatMapJoiningTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, Iterable<String>> get() {
            return new ValueTransformerWithKey<String, String, Iterable<String>>() {
               private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                    kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
                }

                @Override
                public Iterable<String> transform(String readOnlyKey, String value) {
                      List<String> results = new ArrayList<>();
                      final String patternToMatch = readOnlyKey.substring(4, 7);
                      try (KeyValueIterator<String, String> iter =  kvStore.all()) {
                           while(iter.hasNext()) {
                               final KeyValue<String, String> kv = iter.next();
                                if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
                                    results.add(kv.value + " - " + value);
                                }
                           }
                      }
                      return results;
                }

                @Override
                public void close() {

                }
            };
        }
    }

    static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
        final String storeName;

        public StoringValueTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, String> get() {
            return new ValueTransformerWithKey<String, String, String>(){
                private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                       kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
                }

                @Override
                public String transform(String readOnlyKey, String value) {
                    kvStore.putIfAbsent(readOnlyKey, value);
                    return value;
                }

                @Override
                public void close() {
                     //no-op
                }
            };
        }
    }
}

嗯,比尔

相关问题