【07】Flink 之 DataSet API(一):Data Source

x33g5p2x  于2021-12-25 转载在 其他  
字(2.7k)|赞(0)|评价(0)|浏览(498)

DataStream 和 DataSet
Flink用DataStream 表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理应用程序。
从操作形式上看,DataStream 和 DataSet 与集合 Collection 有些相似,但两者有着本质的区别:
(1)DataStream 和 DataSet 是不可变的数据集合,因此不可以想操作集合那样增加或者删除 DataStream 和 DataSet 中的元素,也不可以通过诸如下标等方式访问某个元素。
(2)Flink 应用程序通过 Source 创建 DataStream 对象和 DataSet 对象,通过转换操作产生新的 DataStream 对象和 DataSet 对象。

运行时是应用程序被调度执行时的上下文环境,通过StreamExecutionEnvironment或ExecutionEnvironment方法会根据当前环境自动选择本地或者集群运行时环境。

1、DataSet API之Data Source

  • 基于文件

  • readTextFile(path)

  • 基于集合

  • fromCollection(Collection)
    实际中第一种较长使用,两种的操作方法同DataStream里Data Source的操作

2、基于文件Data Source实践

2.1、Java代码实现

完整代码如下:

package com.Batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
 * @Author: Henry
 * @Description: 基于文件,readTextFile(Path)
 * @Date: Create in 2019/5/16 21:28
 **/
public class BatchWordCount {

    public static void main(String[] args) throws Exception{
        String inputPath = "E:\\IdeaProject\\DemoFlink\\data";
        String outPath = "E:\\IdeaProject\\DemoFlink\\data\\result";

        // 获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 获取文件中的内容
        DataSource<String> text = env.readTextFile(inputPath);

        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer())
                .groupBy(0)
                .sum(1);
        // 写文件
        counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
        env.execute("batch word count");

    }

    public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
                throws Exception {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token: tokens) {
                if(token.length()>0){
                    out.collect(new Tuple2<String, Integer>(token,1));
                }
            }
        }
    }
}

2.2、运行结果

注意:
输出目录文件不能存在,否则会报错,如下:

删除目录后运行:

2.3、Scala代码实现

package cn.Flink

import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * @Author: Henry
  * @Description:
  * @Date: Create in 2019/5/3 15:55 
  **/
object BatchWordCount {
  def main(args: Array[String]): Unit = {

    //  设置文件输入输出路径
    val inputPath = "E:\\IdeaProject\\DemoFlink\\data";       // 目录
    val outPath = "E:\\IdeaProject\\DemoFlink\\data\\resultS1";

    //  1、设置执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //  2、读取文件路径
    val text = env.readTextFile(inputPath)

    //引入隐式转换
    import org.apache.flink.api.scala._

    //  3、进行flatMap打平
    val counts = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_ , 1))
      .groupBy(0)
      .sum(1)

    //  4、输出写入文件
    counts.writeAsCsv(outPath,"\n"," ")
      .setParallelism(1)
    //  5、启动执行环境
    env.execute("batch word count")
  }
}

2.4、运行结果

相关文章