【02】Flink 之滑动窗口统计单词个数WordCount

x33g5p2x  于2021-12-25 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(809)

1、理论

滑动窗口计算

2、实践开发

Flink程序开发步骤:

  1. 获得一个执行环境
  2. 加载/创建 初始化数据
  3. 指定操作数据的 transaction 算子
  4. 指定把计算好的数据放在哪
  5. 调用execute()触发执行程序
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
延迟计算好处:可以开发复杂的程序,Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!

2.1、Java版本

2.1.2 代码实现

Maven依赖

<properties>
  <scala.version>2.11</scala.version>
  <flink.version>1.6.0</flink.version>
</properties>

<!-- flink-java 依赖-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<!-- flink-streaming-java 依赖 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_${scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

添加FlatMap实现方法:

通过 Alt + Enter 插入实现方法:

@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {

}

Java完整代码:

package com.Flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
 * @Author: Henry
 * @Description: 滑动窗口计算
 *               通过socket模拟产生单词数据,flink对数据进行统计计算
 *               需要实现每隔1秒对最近2秒内的数据进行汇总计算
 * @Date: Create in 2019/5/3 9:43
 **/
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception{

        // 设置主机名、分隔符、端口号
        String hostname = "localhost" ;
        String delimiter = "\n" ;
        int port ;
        // 使用parameterTool,通过控制台获取参数
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port") ;
        }catch (Exception e){
            // 如果没有传入参数,则赋默认值
            System.out.println("No port set. use default port 9000--java");
            port = 9999 ;
        }

        //1、获取flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、连接socket获取输入的数据
        DataStream<String> text = env.socketTextStream(
                hostname,port,delimiter);

        // 输入:a a c
        // 输出:(即flatMap操作)
        // a 1
        // a 1
        // c 1

        //3、transformation操作,对数据流实现计算
        // FlatMapFunction<T, O>: T代表输入格式,O代表返回格式
        DataStream<WordWithCount> windowCounts = text
            // 3.1、将用户输入的文本流以非空白符的方式拆开来,得到单个的单词,
            // 存入命名为out的Collector中
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out)
                    throws Exception {
                String[] splits = value.split("\\s") ;  // 通过空白符或制表符切开
                for (String word : splits){
                    out.collect(new WordWithCount(word , 1L));   // 为输出写数据
                }

            }
        })
        // 3.2、将输入的文本分为不相交的分区,每个分区包含的都是具有相同key的元素。
        // 也就是说,相同的单词被分在了同一个区域,下一步的reduce就是统计分区中的个数
        .keyBy("word")
        // 3.3、滑动窗口三个字,指定时间窗口大小为2秒,指定时间间隔为1秒
        .timeWindow(Time.seconds(2), Time.seconds(1))
        // 3.4、一个在KeyedDataStream上“滚动”进行的reduce方法。
        // 将上一个reduce过的值和当前element结合,产生新的值并发送出。
        // 此处是说,对输入的两个对象进行合并,统计该单词的数量和
        // 这里使用 sum 或 reduce 都可以
        //.sum("count") ;  // 是对 reduce 的封装实现
        // reduce 返回类型 SingleOutputStreamOperator,继承了 DataStream
        .reduce(new ReduceFunction<WordWithCount>() {
            public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                return new WordWithCount(a.word, a.count + b.count);
            }
        });

        // 4、把数据打印到控制台并且设置并行度
        windowCounts.print().setParallelism(1);

        // 这一行代码一定要实现,否则程序不执行
        // 报错:Unhandled exception: java.lang.Exception
        // 需要对 main 添加异常捕获
        env.execute("Socket window count");
    }

    // 自定义统计单词的数据结构,包含两个变量和三个方法
    public static class WordWithCount{
        //两个变量存储输入的单词及其数量
        public String word ;
        public long count ;

        // 空参的构造函数
        public  WordWithCount(){}

        // 通过外部传参赋值的构造函数
        public WordWithCount(String word, long count){
            this.word = word ;
            this.count = count ;
        }

        @Override
        // 打印显示 word,count
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

2.1.2 运行

先开启NetCat端口监听:

再运行Run 程序:

2.2 Scala版本

2.2.1 代码实现

注意:

// 注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
import org.apache.flink.api.scala._

val windowCounts = text.flatMap( .... )

报错信息:

官网解释链接:
官网报错信息解释
完整代码:

package cn.Flink

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
/**
  * @Author: Henry
  * @Description: 滑动窗口计算
  *               通过socket模拟产生单词数据,flink对数据进行统计计算
  *               需要实现每隔1秒对最近2秒内的数据进行汇总计算
  * @Date: Create in 2019/5/3 14:28 
  **/
object SocketWindowWordCount {

  def main(args: Array[String]): Unit =  {

    //  设置主机名、分隔符、获取socket端口号
    val hostname = "localhost" ;
    val delimiter = '\n' ;
    val port:Int = try {
      ParameterTool.fromArgs(args).getInt("port")
    }catch {
      case e:Exception => {
        System.err.println("No port set. use default port 9000--scala")
      }
        9999    // 解析失败返回默认值:9999
    }

    //  1、获取运行环境
    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //  2、连接socket获取输入数据
    val text = env.socketTextStream( hostname , port , delimiter)

    //  3、解析数据(吧数据打平),分组,窗口计算,并且聚合求sum
    // 注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
    // https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#type-information-in-the-scala-api
    import org.apache.flink.api.scala._

    //  3.1、打平,把每一行单词都切开
    val windowCounts = text.flatMap( line => line.split("\\s"))
      //  3.2、把单词转成( word , 1 )这种形式
      .map( word => WordWithCount(word, 1))
      //  3.3、按 word 进行分组
      .keyBy("word")
      //  3.4、指定窗口大小,指定间隔时间
      .timeWindow(Time.seconds(2), Time.seconds(1))
      //  3.5、reduce 或者 sum 统计求和
      //.reduce((a, b) => WordWithCount(a.word , a.count + b.count))
      .sum("count")

    //  4、打印到控制台
    windowCounts.print.setParallelism(1)
    //  5、执行任务
    env.execute("Socket window count")
  }

  case class WordWithCount(word: String, count: Long)

}

2.1.2 运行

先开启NetCat端口监听:

再运行Run 程序:

相关文章