文章18 | 阅读 10847 | 点赞0
如下图:
假设,存在一个Flink Job并行度为8,在两台机器上运行,每台机器上有4个并行度,map算子计算需要的数据集为set,通常情况下需要维护8分份,每个map需要一份。而在使用广播变量之后,每个机器上只用维护一份数据集,map使用的时候直接去机器节点上去取。
1:初始化数据
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
.withBroadcastSet(toBroadcast, "broadcastSetName");
3:获取数据
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
dataStream.broadcast()
package com.Streaming.StreamAPI;
import com.Streaming.custormSource.MyNoParalleSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @Author: Henry
* @Description: broadcast分区规则
* @Date: Create in 2019/5/12 13:58
**/
public class StreamingDemoWithMyNoPralalleSourceBroadcast {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource())
.setParallelism(1);
// DataStream<Long> num = text.map(new MapFunction<Long, Long>() { // 每个数据打印一次
DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() { // 每个数据打印并行度个数次
@Override
public Long map(Long value) throws Exception {
long id = Thread.currentThread().getId();
System.out.println("线程id:"+id+",接收到数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();
env.execute(jobName);
}
}
这种不常使用,通过所说广播变量值的是Broadcast,即下面是对广播变量Broadcast的运行对比。
需求:
flink会从数据源中获取到用户的姓名
最终需要把用户的姓名和年龄信息打印出来
分析:
所以就需要在中间的map处理的时候获取用户的年龄信息
建议:
把用户的关系数据集使用广播变量进行处理
注意:
如果多个算子需要使用同一份数据集,
那么需要在对应的多个算子后面分别注册广播变量
package com.Batch.BatchAPI;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @Author: Henry
* @Description: broadcast广播变量:类似于公共的共享变量
*
* @Date: Create in 2019/5/26 21:28
**/
public class BatchDemoBroadcast {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:准备需要广播的数据
ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zs",18));
broadData.add(new Tuple2<>("ls",20));
broadData.add(new Tuple2<>("ww",17));
DataSet<Tuple2<String, Integer>> tupleData = env.
fromCollection(broadData);
//1.1:处理需要广播的数据,把数据集转换成map类型,
DataSet<HashMap<String, Integer>> toBroadcast = tupleData
.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value)
throws Exception {
HashMap<String, Integer> res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
//源数据
DataSource<String> data = env.fromElements("zs", "ls", "ww");
DataSet<String> result = data.map(
new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap = new ArrayList<>();
HashMap<String, Integer> allMap = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//3:获取广播数据
this.broadCastMap = getRuntimeContext()
.getBroadcastVariable("broadCastMapName");
// 将 List 的数据转换一下,存到 map 中
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
@Override
public String map(String name) throws Exception {
Integer age = allMap.get(name);
return name + "," + age;
}
//2:执行广播数据的操作
}).withBroadcastSet(toBroadcast, "broadCastMapName");
result.print();
}
}
package cn.Batch
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
/**
* @Author: Henry
* @Description: broadcast 广播变量操作
* @Date: Create in 2019/5/16 21:28
**/
object BatchDemoBroadcast {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//1: 准备需要广播的数据
val broadData = ListBuffer[Tuple2[String,Int]]()
broadData.append(("zs",18))
broadData.append(("ls",20))
broadData.append(("ww",17))
//1.1处理需要广播的数据
val tupleData = env.fromCollection(broadData)
val toBroadcastData = tupleData.map(tup=>{
Map(tup._1->tup._2)
})
val text = env.fromElements("zs","ls","ww")
val result = text.map(new RichMapFunction[String,String] {
var listData: java.util.List[Map[String,Int]] = _ // java中的list
var allMap = Map[String,Int]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
this.listData = getRuntimeContext
.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
val it = listData.iterator()
while (it.hasNext){
val next = it.next()
allMap = allMap.++(next) // 相当于Java里的 allMap.putAll(map)
}
}
override def map(name: String) = {
val age = allMap.get(name).get
name+","+age
}
}).withBroadcastSet(toBroadcastData,"broadcastMapName")
result.print()
}
}
private IntCounter numLines = new IntCounter();
getRuntimeContext().addAccumulator("num-lines", this.numLines);
this.numLines.add(1);
myJobExecutionResult.getAccumulatorResult("num-lines");
package com.Batch.BatchAPI;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @Author: Henry
* @Description: 全局累加器,counter 计数器
*
* @Date: Create in 2019/5/26 21:28
**/
public class BatchDemoCounter {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(
new RichMapFunction<String, String>() {
//1:创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:注册累加器
getRuntimeContext().addAccumulator(
"num-lines",this.numLines);
}
//int sum = 0;
@Override
public String map(String value) throws Exception {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,
// 则普通的累加求和结果就不准了
//sum++;
//System.out.println("sum:"+sum);
//3:使用累加器
this.numLines.add(1);
return value;
}
}).setParallelism(8);
//result.print();
result.writeAsText("E:\\IdeaProject\\DemoFlink\\data\\count10");
JobExecutionResult jobResult = env.execute("counter");
//4:获取累加器
int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num:"+num);
}
}
package cn.Batch
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* @Author: Henry
* @Description: counter 全局累加器 操作
* @Date: Create in 2019/5/26 21:28
**/
object BatchDemoCounter {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.fromElements("a","b","c","d")
val res = data.map(new RichMapFunction[String,String] {
//1:定义累加器
val numLines = new IntCounter
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//2:注册累加器
getRuntimeContext.addAccumulator("num-lines",this.numLines)
}
override def map(value: String) = {
//3:使用累加器
this.numLines.add(1)
value
}
}).setParallelism(4)
res.writeAsText("E:\\IdeaProject\\DemoFlink\\data\\count20")
val jobResult = env.execute("BatchDemoCounterScala")
//4:获取累加器
val num = jobResult.getAccumulatorResult[Int]("num-lines")
println("num:"+num)
}
}
Exception in thread "main" java.lang.RuntimeException: No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90407818
内容来源于网络,如有侵权,请联系作者删除!