文章18 | 阅读 10842 | 点赞0
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
说明:
程序注册一个文件或本地目录,通过注册缓存文件并
为它起一个名称,程序执行时自动拷贝到taskmanager节点
本地文件系统,从taskmanager本地节点访问,没有网络IO
package com.Batch.BatchAPI;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: Henry
* @Description: Distributed Cache 分布式缓存
*
*
* @Date: Create in 2019/5/26 21:28
**/
public class BatchDemoDisCache {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:注册一个文件,可以使用hdfs或者s3上的文件, name:"a.txt",此处相当于起个别名
env.registerCachedFile("E:\\IdeaProject\\DemoFlink\\data\\a.txt","a.txt");
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:使用文件
File myFile = getRuntimeContext() // 获取上下文
.getDistributedCache()
.getFile("a.txt");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);
System.out.println("line:" + line);
}
}
@Override
public String map(String value) throws Exception {
//在这里就可以使用dataList
return "e: " + value;
}
});
result.print();
}
}
package cn.Batch
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* @Author: Henry
* @Description: Distributed Cache分布式缓存操作
* @Date: Create in 2019/5/26 21:28
**/
object BatchDemoDisCache {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//1:注册文件
env.registerCachedFile("E:\\IdeaProject\\DemoFlink\\data\\a.txt","b.txt")
val data = env.fromElements("a","b","c","d")
val result = data.map(new RichMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val myFile = getRuntimeContext
.getDistributedCache
.getFile("b.txt")
val lines = FileUtils.readLines(myFile)
val it = lines.iterator()
while (it.hasNext){
val line = it.next()
println("line:"+line)
}
}
override def map(value: String) = {
value
}
})
result.print()
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90751745
内容来源于网络,如有侵权,请联系作者删除!