文章18 | 阅读 10846 | 点赞0
聚合事件(比如计数、求和)在流上的工作方式与批处理不同。
比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。
window是一种可以把无限数据切割为有限数据块的手段
窗口可以是 时间驱动的 【Time Window】(比如:每30秒)或者 数据驱动的【Count Window】 (比如:每100个元素)。
即 Time Window 和 Count Window 都可以实现 滚动窗口 或者 滑动窗口。
官网代码实例如下:
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
官网reduce(reduceFunction)例子:
官网aggregate(aggregateFunction)例子:
apply(windowFunction)
process(processWindowFunction)
全量聚合状态变化过程-求最大值
官网apply(windowFunction)例子:
官网process(processWindowFunction)例子:
package com.Streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: Henry
* @Description: 增量聚合
* @Date: Create in 2019/6/2 15:14
**/
public class SocketDemoIncrAgg {
public static void main(String[] args) throws Exception{
//获取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9003--java");
port = 9003;
}
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "master";
String delimiter = "\n";
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<Tuple2<Integer,Integer>> intData = text.map(
new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String value) throws Exception {
return new Tuple2<>(1,Integer.parseInt(value));
}
});
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
Tuple2<Integer, Integer> value2)
throws Exception {
System.out.println("执行reduce操作:"+value1+","+value2);
return new Tuple2<>(value1.f0,value1.f1+value2.f1);
}
}).print();
//这一行代码一定要实现,否则程序不执行
env.execute("Socket window count");
}
}
每过来一条数据进行一次计算:
在这里插入代码片
package com.Streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* @Author: Henry
* @Description: window 增量聚合
* @Date: Create in 2019/6/2 16:11
**/
public class SocketDemoFullCount {
public static void main(String[] args) throws Exception{
//获取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9003--java");
port = 9003;
}
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "master";
String delimiter = "\n";
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
DataStream<Tuple2<Integer,Integer>> intData = text.map(
new MapFunction<String, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer,Integer> map(String value) throws Exception {
return new Tuple2<>(1,Integer.parseInt(value));
}
});
intData.keyBy(0)
.timeWindow(Time.seconds(5))
.process(
new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, Context context,
Iterable<Tuple2<Integer, Integer>> elements,
Collector<String> out)
throws Exception {
System.out.println("执行process。。。");
long count = 0;
for(Tuple2<Integer,Integer> element: elements){
count++;
}
out.collect("window:"+context.window()+",count:"+count);
}
}).print();
//这一行代码一定要实现,否则程序不执行
env.execute("Socket window count");
}
}
统计接收到的数据数量:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90768517
内容来源于网络,如有侵权,请联系作者删除!