kafka流:实现一个简单的keyvaluestore,在这里我可以放置和获取数据

ca1c2owp  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(561)

我有一个kafka streams应用程序,它对传入状态进行操作,需要在写入下一个主题之前存储状态。只有在本地存储中更新状态后,才应进行写入。
像这样的。

stream.map(this::getAndUpdateState)
          .map(this::processStateAndEvent)
          .to("topicname");

所以在 getAndUpdateState() 我可以做喜欢的事

state = store.get(key); // or new if null
state = updateState(state, event);  // update changes to state
store.put(key, state);  // write back the state
return state;

如何在kafka存储上实现简单的get()和put()操作?我已经尝试过使用keyvaluestore,但它有问题,因为我必须添加一个源和接收器处理器等等。
或者,使用ktable或其他一些概念来获取和放置Kafka也可以。

oiopk7p5

oiopk7p51#

感谢用户152468和matthias j.的建议。萨克斯。
我能够在kafka流中使用 transform() 方法。下面给出了基于原始管道示例的完整工作代码。
管道.java:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe{
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();

        //  setting Configs
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // initializing  a streambuilder for building topology.
        final StreamsBuilder builder = new StreamsBuilder();
        // creating a KStream that is continuously generating records from its source kafka topic "streams-plaintext-output"
        KStream<String, String> source = builder.stream("streams-plaintext-input");

        StoreBuilder<KeyValueStore<String, Long>> wordCountsStore = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("WordCountsStore"),
                Serdes.String(),
                Serdes.Long())
                .withCachingEnabled();

        builder.addStateStore(wordCountsStore);

        source.map((k, v) -> KeyValue.pair("key", v))
                .peek((k, s) -> System.out.printf("After keying: %s, value: %s\n", k, s))
                .transform(new SampleTransformSupplier(wordCountsStore.name()), wordCountsStore.name())
                .peek((k, s) -> System.out.printf("After transform: %s, value: %s\n", k, s));
        // writing this source to another kafka topic "streams-pipe-output"
        source.to("streams-pipe-output");
        // generating the topology
        final Topology topology = builder.build();
        System.out.print(topology.describe());

        // constructing a streams client with the properties and topology
        final KafkaStreams streams = new KafkaStreams(topology, properties);
        final CountDownLatch latch = new CountDownLatch(1);

        // attaching shutdown handler
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run(){
                streams.close();
                latch.countDown();
            }
        });
        try{
            streams.start();
            latch.await();
        } catch (Throwable e){
            System.exit(1);
        }
        System.exit(0);
    }

    private static class SampleTransformSupplier implements TransformerSupplier<String, String, KeyValue<String, String>> {

        final private String stateStoreName;

        public SampleTransformSupplier(String stateStoreName) {
            this.stateStoreName = stateStoreName;
        }

        @Override
        public Transformer<String, String, KeyValue<String, String>> get() {
            return new Transformer<String, String, KeyValue<String, String>>() {

                private KeyValueStore<String, Long> stateStore;

                @SuppressWarnings("unchecked")
                @Override
                public void init(ProcessorContext processorContext) {
                    stateStore = (KeyValueStore<String, Long>) processorContext.getStateStore(stateStoreName);
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    Long countSoFar = stateStore.get(key);
                    if(countSoFar == null){
                        System.out.print("Initializing count so far. this message should be printed only once");
                        countSoFar = 0L;
                    }
                    countSoFar += value.length();
                    System.out.printf(" Key: %s, Value: %s, Count: %d\n\n", key, value, countSoFar);
                    stateStore.put(key, countSoFar);
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                    // No need to close as this is handled by kafka.
                }
            };
        }
    }
}
bq3bfh9z

bq3bfh9z2#

听起来你想做批处理。kafka streams是一个流处理库,所有处理器并行/并发运行,构建一个数据管道。
我想你还是可以用的 transform() 附加状态,不向下游发出任何信息,只将数据放入存储区。您可以安排一个挂钟时间标点来扫描整个商店,并向下游发出商店中的所有数据。然而,总的来说,这似乎是一种反模式。
思考最难理解的部分是,什么时候状态是“完全加载”的——因为一个主题在定义上/概念上是无限的,加载状态“永远”不会结束。

相关问题