使用flink的cassandrasink更新问题

7gcisfzg  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(314)

当我们使用cassandrasink进行字数计算时,我们可以在每个窗口更新一条记录。但是当我们使用stream.print()方法时,它会将所有记录打印到控制台输出。理想情况下,当我们使用cassandra sink时也会发生同样的情况。为什么不发生这种情况?请提供帮助。请查找下面的代码。

DataStream<String> text = env.addSource(
            new FlinkKafkaConsumer(Util.KAFKA_STREAM_TOPIC, new SimpleStringSchema(), properties));

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordCount> result = text.flatMap(new FlatMapFunction<String, WordCount>() {
        @Override
        public void flatMap(String value, Collector<WordCount> out) {
            // normalize and split the line
            String[] words = value.toLowerCase().split(":");

            // emit the pairs
            for (String word : words) {
                if (!word.isEmpty()) {
                    // Do not accept empty word, since word is defined as primary key in C* table
                    out.collect(new WordCount(word, 1L));
                }
            }
        }

    }).keyBy("word").timeWindow(Time.seconds(10)).reduce(new ReduceFunction<WordCount>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public WordCount reduce(WordCount a, WordCount b) {
                    return new WordCount(a.getWord(), a.getCount() + b.getCount());
                }
            });

CassandraSink<WordCount> sink = CassandraSink.addSink(result).setHost("127.0.0.1")
                .setMapperOptions(() -> new Mapper.Option[] { Mapper.Option.saveNullFields(true) }).build();

        sink.name("Cassandra Sink234").disableChaining().setParallelism(1).uid("test");

注:flink版本-1.10.0

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题