【10】Flink 之 DataSet API(四):Broadcast & Accumulators & Counters

x33g5p2x  于2021-12-25 转载在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(422)

1.1、介绍

  • 广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks
  • 广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
  • 一句话解释,可以理解为是一个公共的共享变量,可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

如下图:

假设,存在一个Flink Job并行度为8,在两台机器上运行,每台机器上有4个并行度,map算子计算需要的数据集为set,通常情况下需要维护8分份,每个map需要一份。而在使用广播变量之后,每个机器上只用维护一份数据集,map使用的时候直接去机器节点上去取。

1.2、用法

1:初始化数据

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)

2:广播数据

.withBroadcastSet(toBroadcast, "broadcastSetName");

3:获取数据

Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
  • 注意:
    1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
    2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

1.3、DataStreaming 中的Broadcast

  • 把元素广播给所有的分区,数据会被重复处理(使用较少)
  • 类似于storm中的allGrouping
dataStream.broadcast()

1.3.1 代码实现

package com.Streaming.StreamAPI;

import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @Author: Henry
 * @Description: broadcast分区规则
 * @Date: Create in 2019/5/12 13:58
 **/
public class StreamingDemoWithMyNoPralalleSourceBroadcast {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource())
                .setParallelism(1);

  //      DataStream<Long> num = text.map(new MapFunction<Long, Long>() {  // 每个数据打印一次
        DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {  // 每个数据打印并行度个数次
            @Override
            public Long map(Long value) throws Exception {
                long id = Thread.currentThread().getId();
                System.out.println("线程id:"+id+",接收到数据:" + value);
                return value;
            }
        });

        //每2秒钟处理一次数据
        DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
        //打印结果
        sum.print().setParallelism(1);

        String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();
        env.execute(jobName);
    }
}

1.3.2 运行结果

  1. 未设置broadcast()运行结果

  1. 设置broadcast()运行结果

这种不常使用,通过所说广播变量值的是Broadcast,即下面是对广播变量Broadcast的运行对比。

1.4、广播变量Broadcast实践

需求:
  flink会从数据源中获取到用户的姓名
  最终需要把用户的姓名和年龄信息打印出来

分析:
  所以就需要在中间的map处理的时候获取用户的年龄信息

建议:
  把用户的关系数据集使用广播变量进行处理

注意:
  如果多个算子需要使用同一份数据集,
  那么需要在对应的多个算子后面分别注册广播变量

1.4.1、Java代码实现

package com.Batch.BatchAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @Author: Henry
 * @Description: broadcast广播变量:类似于公共的共享变量
 *
 * @Date: Create in 2019/5/26 21:28
 **/

public class BatchDemoBroadcast {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:准备需要广播的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zs",18));
        broadData.add(new Tuple2<>("ls",20));
        broadData.add(new Tuple2<>("ww",17));
        DataSet<Tuple2<String, Integer>> tupleData = env.
                fromCollection(broadData);

        //1.1:处理需要广播的数据,把数据集转换成map类型,
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData
                .map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value)
                    throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });
 
        //源数据
        DataSource<String> data = env.fromElements("zs", "ls", "ww");

        DataSet<String> result = data.map(
                new RichMapFunction<String, String>() {

            List<HashMap<String, Integer>> broadCastMap = new ArrayList<>();
            HashMap<String, Integer> allMap = new HashMap<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:获取广播数据
                this.broadCastMap = getRuntimeContext()
                        .getBroadcastVariable("broadCastMapName");
                // 将 List 的数据转换一下,存到 map 中
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }
            }

            @Override
            public String map(String name) throws Exception {
                Integer age = allMap.get(name);
                return name + "," + age;
            }
          //2:执行广播数据的操作
        }).withBroadcastSet(toBroadcast, "broadCastMapName");

        result.print();
    }
}

1.4.2、Scala代码实现

package cn.Batch

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

import scala.collection.mutable.ListBuffer

/**
  * @Author: Henry
  * @Description: broadcast 广播变量操作
  * @Date: Create in 2019/5/16 21:28
  **/
object BatchDemoBroadcast {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    
    import org.apache.flink.api.scala._
    //1: 准备需要广播的数据
    val broadData = ListBuffer[Tuple2[String,Int]]()
    broadData.append(("zs",18))
    broadData.append(("ls",20))
    broadData.append(("ww",17))

    //1.1处理需要广播的数据
    val tupleData = env.fromCollection(broadData)
    val toBroadcastData = tupleData.map(tup=>{
      Map(tup._1->tup._2)
    })

    val text = env.fromElements("zs","ls","ww")

    val result = text.map(new RichMapFunction[String,String] {

      var listData: java.util.List[Map[String,Int]] = _  // java中的list
      var allMap  = Map[String,Int]()

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        this.listData = getRuntimeContext
          .getBroadcastVariable[Map[String,Int]]("broadcastMapName")
        val it = listData.iterator()
        while (it.hasNext){
          val next = it.next()
          allMap = allMap.++(next)  // 相当于Java里的 allMap.putAll(map)
        }
      }
      override def map(name: String) = {
        val age = allMap.get(name).get
        name+","+age
      }
    }).withBroadcastSet(toBroadcastData,"broadcastMapName")
    
  result.print()
  }
}

1.4.3、运行结果

2.1、Accumulators & Counters

  • Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化
  • 可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
  • Counter是一个具体的累加器(Accumulator)实现
    IntCounter, LongCounter 和 DoubleCounter
  • 用法
      1. 创建累加器
private IntCounter numLines = new IntCounter();
  1. 注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
  1. 使用累加器
this.numLines.add(1);
  1. 获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines");

2.2、Broadcast和Accumulators的区别

  • Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改
  • Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作

2.3、Java代码实现

package com.Batch.BatchAPI;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
 * @Author: Henry
 * @Description: 全局累加器,counter 计数器
 *
 * @Date: Create in 2019/5/26 21:28
 **/
public class BatchDemoCounter {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(
                new RichMapFunction<String, String>() {

            //1:创建累加器
           private IntCounter numLines = new IntCounter();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:注册累加器
                getRuntimeContext().addAccumulator(
                        "num-lines",this.numLines);

            }

            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,
                // 则普通的累加求和结果就不准了
                //sum++;
                //System.out.println("sum:"+sum);
                //3:使用累加器
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);

        //result.print();

        result.writeAsText("E:\\IdeaProject\\DemoFlink\\data\\count10");

        JobExecutionResult jobResult = env.execute("counter");

        //4:获取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);
    }
}

2.4、Scala代码实现

package cn.Batch

import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * @Author: Henry
  * @Description: counter 全局累加器 操作
  * @Date: Create in 2019/5/26 21:28
  **/
object BatchDemoCounter {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val data = env.fromElements("a","b","c","d")

    val res = data.map(new RichMapFunction[String,String] {
      //1:定义累加器
      val numLines = new IntCounter

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        //2:注册累加器
        getRuntimeContext.addAccumulator("num-lines",this.numLines)
      }

      override def map(value: String) = {
        //3:使用累加器
        this.numLines.add(1)
        value
      }
    }).setParallelism(4)

    res.writeAsText("E:\\IdeaProject\\DemoFlink\\data\\count20")
    val jobResult = env.execute("BatchDemoCounterScala")
    //4:获取累加器
    val num = jobResult.getAccumulatorResult[Int]("num-lines")
    println("num:"+num)

  }
}

2.5、运行结果

  1. 如果使用 print() 这种 Sink 会报错,此时需要使用 res.writeAsText("…") ,报错,如下:
Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.

  1. 增加 res.writeAsText("…") 后正常运行,如下:

相关文章