文章18 | 阅读 11236 | 点赞0
Keyed State和Operator State,可以以两种:
State-Keyed State 就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。
stream.keyBy(…) // 调用keyBy()之后就会变成基于key 的stream,keyBy的返回值是 KeyStream
有四种保存State-Keyed State的数据结构:
其中:“Code”相当于程序中的算子任务,每个任务都会有一个State,通过checkpoint保存快照,保存起来。
恢复快照时,“Code”通过保存的checkpoint进行恢复快照。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。
state 的store和checkpoint的位置取决于State Backend的配置
env.setStateBackend(…)
一共有三种State Backend:
修改State Backend的两种方式:
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖,如下:】
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6.0</version>
</dependency>
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:
1、jobmanager (MemoryStateBackend)
2、filesystem (FsStateBackend)
3、rocksdb (RocksDBStateBackend)
package com.Streaming;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @Author: Henry
* @Description: checkpoint的实现
* @Date: Create in 2019/5/30 22:45
*/
public class SocketWindowWordCountCheckPoint {
public static void main(String[] args) throws Exception{
//获取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9999--java");
port = 9999;
}
//获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoin
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("hdfs://master:9000/flink/checkpoints"));
String hostname = "master";
String delimiter = "\n";
//连接socket获取输入的数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
// a a c
// a 1
// a 1
// c 1
DataStream<WordWithCount> windowCounts = text.flatMap(
new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out)
throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒
.sum("count");//在这里使用sum或者reduce都可以
/*.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
})*/
//把数据打印到控制台并且设置并行度
windowCounts.print().setParallelism(1);
//这一行代码一定要实现,否则程序不执行
env.execute("Socket window count");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word,long count){
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
env.setStateBackend(new FsStateBackend("hdfs://master:9000/flink/checkpoints"));
通过代码中设置检查点存储位置,应用于单任务的检查点设置容错恢复
步骤1:
将工程代码通过Maven打jar包上传至集群中
步骤2:
首先,通过Ncat打开一个端口,如9002,再通过yarn的方式提交flink集群运行
集群提交命令如下:
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.Streaming.SocketWindowWordCountCheckPoint flink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9002
提交运行成功会显示如下界面:
步骤3:
通过Yarn的UI页面可以查看任务情况,如下:
步骤4:
通过点击任务进入可查看任务信息,即检查点,如下:
在其中的"Configuration"可以看到检查点的配置信息:
步骤5:
通过终端发送单词,更新检查点存储,查看HDFS存储信息。通过hdfs命令可以查看hdfs路径下会不断更新,存储最新的检查点内容:
步骤6:
查看终端发送单词的统计结果:
以上方法是通过代码进行的单任务配置检查点方法。
通过配置方式设置检查点具有全局的作用,所有任务都会在设置的检查点目录下生成各自JobID对应文件夹的CheckPoint内容。
步骤1:
配置 flink/confi/flink-conf.yaml文件
步骤6:
查看检查点在hdfs上的存储情况:
state.checkpoints.num-retained: 20
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
从Checkpoint进行恢复
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
恢复运行提交命令如下:
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -s hdfs://master:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata -c com.Streaming.SocketWindowWordCountCheckPoint flink-1.0-SNAPSHOT-jar-with-dependencies.jar --port 9002
1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:触发一个savepoint【直接触发或者在cancel的时候触发】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90752579
内容来源于网络,如有侵权,请联系作者删除!