文件系统—如何使用flink java api读取目录(本地文件系统/hdfs)下的文件名

wnrlj8wa  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(1508)

我是Flink的新手。实际上我正在尝试通过flinkjavaapi读取文件和csv转换。
按照我们的要求。a) 需要将文件夹作为输入参数传递,将输出参数作为csv文件名b)需要从本地文件系统/hdfs读取文件c)将相同的数据写入csv
我的代码:

public class WriteToCSV {

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        List<String> paths = new ArrayList<String>();
        File dir = new File("C://");
        for (File f : dir.listFiles()) {
              paths.add(f.getName());
        }
        DataSet<String> data = env.fromCollection(paths).rebalance();

        DataSet<Tuple2<String, Integer>> counts = 
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    data.flatMap(new MySplitter()).groupBy(0).sum(1);

        System.out.println(" data -:"+data);
        data.print();
        counts.writeAsCsv("C://new.csv");
    }
}

class MySplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line into words
        String[] tokens = value.split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }
        }
    }
}

我可以得到文件名(data.print())。但是csv没有创建,服务器控制台中也没有例外。

dxxyhpgq

dxxyhpgq1#

代码中没有任何内容写入csv的原因是您没有调用 env.execute() 之后 counts.writeAsCsv("C://new.csv"); 要进一步改进代码,可以使用 env.readTextFile(path) 它接受一个目录的路径并读取该目录中的所有文件,为每一行生成记录。

相关问题