我正在试用apache flink,为了从学习中检验我的知识,我正在玩经典的单词计数问题。
这是我的密码:
public class TestWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> addSource = env.addSource(new TestSource());
DataStream<Tuple2<String, Integer>> sum = addSource
.flatMap(new Tokenizer())
.keyBy(0)
.sum(1);
sum.print();
env.execute();
}
}
class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String part: value.split(" "))
out.collect(new Tuple2<>(part.toLowerCase(), 1));
}
}
class TestSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L;
String s = "Hadoop is the Elephant King! A yellow and elegant thing. He never forgets. The Useful data, or lets An extraneous element cling!";
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect(s);
}
@Override
public void cancel() {
}
}
当我运行它时,输出是这样的:
(hadoop,1)(is,1)(the,1)(大象,1)(国王!,1) (a,1)(黄色,1)(和,1)(优雅,1)(事物,1)(他,1)(从不,1)(忘记,1)(这个,2)(有用,1)(数据,1)(或,1)(让,1)(安,1)(无关,1)(元素,1)(粘着!)!,1)
我只是好奇,为什么 the
来了两次 (the,1)
以及 (the,2)
?
我们将非常感谢您的帮助。
2条答案
按热度按时间xwmevbvl1#
为什么要做两次?
我相信你已经发了两次“the”。(the,1)是发送第一个“the”时的计数,(the,2)是发送第二个“the”时的计数。
每次它接收到一个元素并输出它时,sum都会聚合数据。
nmpmafwu2#
在处理数据流时,输入是无限制的,因此不可能等到“结束”才打印出结果。“最终报告”的概念毫无意义。所以到目前为止,你得到的是一个不断更新的结果流。