文章18 | 阅读 11224 | 点赞0
滑动窗口计算
Flink程序开发步骤:
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
延迟计算好处:可以开发复杂的程序,Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!
Maven依赖
<properties>
<scala.version>2.11</scala.version>
<flink.version>1.6.0</flink.version>
</properties>
<!-- flink-java 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-streaming-java 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
添加FlatMap实现方法:
通过 Alt + Enter 插入实现方法:
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
}
Java完整代码:
package com.Flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @Author: Henry
* @Description: 滑动窗口计算
* 通过socket模拟产生单词数据,flink对数据进行统计计算
* 需要实现每隔1秒对最近2秒内的数据进行汇总计算
* @Date: Create in 2019/5/3 9:43
**/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{
// 设置主机名、分隔符、端口号
String hostname = "localhost" ;
String delimiter = "\n" ;
int port ;
// 使用parameterTool,通过控制台获取参数
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port") ;
}catch (Exception e){
// 如果没有传入参数,则赋默认值
System.out.println("No port set. use default port 9000--java");
port = 9999 ;
}
//1、获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、连接socket获取输入的数据
DataStream<String> text = env.socketTextStream(
hostname,port,delimiter);
// 输入:a a c
// 输出:(即flatMap操作)
// a 1
// a 1
// c 1
//3、transformation操作,对数据流实现计算
// FlatMapFunction<T, O>: T代表输入格式,O代表返回格式
DataStream<WordWithCount> windowCounts = text
// 3.1、将用户输入的文本流以非空白符的方式拆开来,得到单个的单词,
// 存入命名为out的Collector中
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out)
throws Exception {
String[] splits = value.split("\\s") ; // 通过空白符或制表符切开
for (String word : splits){
out.collect(new WordWithCount(word , 1L)); // 为输出写数据
}
}
})
// 3.2、将输入的文本分为不相交的分区,每个分区包含的都是具有相同key的元素。
// 也就是说,相同的单词被分在了同一个区域,下一步的reduce就是统计分区中的个数
.keyBy("word")
// 3.3、滑动窗口三个字,指定时间窗口大小为2秒,指定时间间隔为1秒
.timeWindow(Time.seconds(2), Time.seconds(1))
// 3.4、一个在KeyedDataStream上“滚动”进行的reduce方法。
// 将上一个reduce过的值和当前element结合,产生新的值并发送出。
// 此处是说,对输入的两个对象进行合并,统计该单词的数量和
// 这里使用 sum 或 reduce 都可以
//.sum("count") ; // 是对 reduce 的封装实现
// reduce 返回类型 SingleOutputStreamOperator,继承了 DataStream
.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// 4、把数据打印到控制台并且设置并行度
windowCounts.print().setParallelism(1);
// 这一行代码一定要实现,否则程序不执行
// 报错:Unhandled exception: java.lang.Exception
// 需要对 main 添加异常捕获
env.execute("Socket window count");
}
// 自定义统计单词的数据结构,包含两个变量和三个方法
public static class WordWithCount{
//两个变量存储输入的单词及其数量
public String word ;
public long count ;
// 空参的构造函数
public WordWithCount(){}
// 通过外部传参赋值的构造函数
public WordWithCount(String word, long count){
this.word = word ;
this.count = count ;
}
@Override
// 打印显示 word,count
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
先开启NetCat端口监听:
再运行Run 程序:
注意:
// 注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
import org.apache.flink.api.scala._
val windowCounts = text.flatMap( .... )
报错信息:
官网解释链接:
官网报错信息解释
完整代码:
package cn.Flink
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
* @Author: Henry
* @Description: 滑动窗口计算
* 通过socket模拟产生单词数据,flink对数据进行统计计算
* 需要实现每隔1秒对最近2秒内的数据进行汇总计算
* @Date: Create in 2019/5/3 14:28
**/
object SocketWindowWordCount {
def main(args: Array[String]): Unit = {
// 设置主机名、分隔符、获取socket端口号
val hostname = "localhost" ;
val delimiter = '\n' ;
val port:Int = try {
ParameterTool.fromArgs(args).getInt("port")
}catch {
case e:Exception => {
System.err.println("No port set. use default port 9000--scala")
}
9999 // 解析失败返回默认值:9999
}
// 1、获取运行环境
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2、连接socket获取输入数据
val text = env.socketTextStream( hostname , port , delimiter)
// 3、解析数据(吧数据打平),分组,窗口计算,并且聚合求sum
// 注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
// https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#type-information-in-the-scala-api
import org.apache.flink.api.scala._
// 3.1、打平,把每一行单词都切开
val windowCounts = text.flatMap( line => line.split("\\s"))
// 3.2、把单词转成( word , 1 )这种形式
.map( word => WordWithCount(word, 1))
// 3.3、按 word 进行分组
.keyBy("word")
// 3.4、指定窗口大小,指定间隔时间
.timeWindow(Time.seconds(2), Time.seconds(1))
// 3.5、reduce 或者 sum 统计求和
//.reduce((a, b) => WordWithCount(a.word , a.count + b.count))
.sum("count")
// 4、打印到控制台
windowCounts.print.setParallelism(1)
// 5、执行任务
env.execute("Socket window count")
}
case class WordWithCount(word: String, count: Long)
}
先开启NetCat端口监听:
再运行Run 程序:
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/89812636
内容来源于网络,如有侵权,请联系作者删除!