文章18 | 阅读 10836 | 点赞0
DataStream 和 DataSet
Flink用DataStream 表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理应用程序。
从操作形式上看,DataStream 和 DataSet 与集合 Collection 有些相似,但两者有着本质的区别:
(1)DataStream 和 DataSet 是不可变的数据集合,因此不可以想操作集合那样增加或者删除 DataStream 和 DataSet 中的元素,也不可以通过诸如下标等方式访问某个元素。
(2)Flink 应用程序通过 Source 创建 DataStream 对象和 DataSet 对象,通过转换操作产生新的 DataStream 对象和 DataSet 对象。
运行时是应用程序被调度执行时的上下文环境,通过StreamExecutionEnvironment或ExecutionEnvironment方法会根据当前环境自动选择本地或者集群运行时环境。
基于文件
readTextFile(path)
基于集合
fromCollection(Collection)
实际中第一种较长使用,两种的操作方法同DataStream里Data Source的操作
完整代码如下:
package com.Batch;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @Author: Henry
* @Description: 基于文件,readTextFile(Path)
* @Date: Create in 2019/5/16 21:28
**/
public class BatchWordCount {
public static void main(String[] args) throws Exception{
String inputPath = "E:\\IdeaProject\\DemoFlink\\data";
String outPath = "E:\\IdeaProject\\DemoFlink\\data\\result";
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 获取文件中的内容
DataSource<String> text = env.readTextFile(inputPath);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
// 写文件
counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
env.execute("batch word count");
}
public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2<String, Integer>(token,1));
}
}
}
}
}
注意:
输出目录文件不能存在,否则会报错,如下:
删除目录后运行:
package cn.Flink
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* @Author: Henry
* @Description:
* @Date: Create in 2019/5/3 15:55
**/
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 设置文件输入输出路径
val inputPath = "E:\\IdeaProject\\DemoFlink\\data"; // 目录
val outPath = "E:\\IdeaProject\\DemoFlink\\data\\resultS1";
// 1、设置执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 2、读取文件路径
val text = env.readTextFile(inputPath)
//引入隐式转换
import org.apache.flink.api.scala._
// 3、进行flatMap打平
val counts = text
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_ , 1))
.groupBy(0)
.sum(1)
// 4、输出写入文件
counts.writeAsCsv(outPath,"\n"," ")
.setParallelism(1)
// 5、启动执行环境
env.execute("batch word count")
}
}
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/hongzhen91/article/details/90729776
内容来源于网络,如有侵权,请联系作者删除!