【11】Flink 之 DataSet API(五):Distributed Cache(分布式缓存)

x33g5p2x  于2021-12-25 转载在 其他  
字(3.0k)|赞(0)|评价(0)|浏览(501)

1、Distributed Cache(分布式缓存)

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
  • 用法:
  1. 注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
  1. 访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

2、分布式缓存实践

说明:
  程序注册一个文件或本地目录,通过注册缓存文件并
  为它起一个名称,程序执行时自动拷贝到taskmanager节点
  本地文件系统,从taskmanager本地节点访问,没有网络IO

2.1、Java代码实现

package com.Batch.BatchAPI;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: Henry
 * @Description: Distributed Cache 分布式缓存
 *
 *
 * @Date: Create in 2019/5/26 21:28
 **/
public class BatchDemoDisCache {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:注册一个文件,可以使用hdfs或者s3上的文件, name:"a.txt",此处相当于起个别名
        env.registerCachedFile("E:\\IdeaProject\\DemoFlink\\data\\a.txt","a.txt");

        DataSource<String> data = env.fromElements("a", "b", "c", "d");

        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用文件
                File myFile = getRuntimeContext()   // 获取上下文
                        .getDistributedCache()
                        .getFile("a.txt");
                List<String> lines = FileUtils.readLines(myFile);
                for (String line : lines) {
                    this.dataList.add(line);
                    System.out.println("line:" + line);
                }
            }

            @Override
            public String map(String value) throws Exception {
                //在这里就可以使用dataList
                return "e: " +  value;
            }
        });

        result.print();
    }
}

2.3、Scala代码实现

package cn.Batch

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
  * @Author: Henry
  * @Description: Distributed Cache分布式缓存操作
  * @Date: Create in 2019/5/26 21:28
  **/
object BatchDemoDisCache {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    //1:注册文件
    env.registerCachedFile("E:\\IdeaProject\\DemoFlink\\data\\a.txt","b.txt")

    val data = env.fromElements("a","b","c","d")

    val result = data.map(new RichMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val myFile = getRuntimeContext
          .getDistributedCache
          .getFile("b.txt")
        val lines = FileUtils.readLines(myFile)
        val it = lines.iterator()
        while (it.hasNext){
          val line = it.next()
          println("line:"+line)
        }
      }
      override def map(value: String) = {
        value
      }
    })

    result.print()
  }
}

2.3、运行结果

相关文章