当我们使用cassandrasink进行字数计算时,我们可以在每个窗口更新一条记录。但是当我们使用stream.print()方法时,它会将所有记录打印到控制台输出。理想情况下,当我们使用cassandra sink时也会发生同样的情况。为什么不发生这种情况?请提供帮助。请查找下面的代码。
DataStream<String> text = env.addSource(
new FlinkKafkaConsumer(Util.KAFKA_STREAM_TOPIC, new SimpleStringSchema(), properties));
// parse the data, group it, window it, and aggregate the counts
DataStream<WordCount> result = text.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) {
// normalize and split the line
String[] words = value.toLowerCase().split(":");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
// Do not accept empty word, since word is defined as primary key in C* table
out.collect(new WordCount(word, 1L));
}
}
}
}).keyBy("word").timeWindow(Time.seconds(10)).reduce(new ReduceFunction<WordCount>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public WordCount reduce(WordCount a, WordCount b) {
return new WordCount(a.getWord(), a.getCount() + b.getCount());
}
});
CassandraSink<WordCount> sink = CassandraSink.addSink(result).setHost("127.0.0.1")
.setMapperOptions(() -> new Mapper.Option[] { Mapper.Option.saveNullFields(true) }).build();
sink.name("Cassandra Sink234").disableChaining().setParallelism(1).uid("test");
注:flink版本-1.10.0
暂无答案!
目前还没有任何答案,快来回答吧!