apache crunch无法写入输出

7ivaypg9  于 2021-05-31  发布在  Hadoop
关注(0)|答案(1)|浏览(570)

可能是疏忽,但我无法发现为什么apachecrunch不会为一个非常简单的程序将输出写到文件中,我正在编写这个程序来学习crunch。。
代码如下:

import org.apache.crunch.Pipeline;
import org.apache.hadoop.conf.Configuration;    

....
private Pipeline                  pipeline;
private Configuration             etlConf;

....
this.etlConf  = getConf();
this.pipeline = new MRPipeline(TestETL.class, etlConf);
....

// Read file
logger.info("Reading input file: " + inputFileURI.toString());
PCollection<String> input = pipeline.readTextFile(inputFileURI.toString());

System.out.println("INPUT SIZE = " + input.asCollection().getValue().size());

// Write file 
logger.info("Writing Final output to file: " + outputFileURI.toString());
input.write(
    To.textFile(outputFileURI.toString()),
    WriteMode.OVERWRITE
);

这是我使用hadoop执行这个jar时看到的日志:

18/12/31 09:41:51 INFO etl.TestClass: Executing Test run
18/12/31 09:41:51 INFO etl.TestETL: Reading input file: /user/sw029693/process_analyzer/input/input.txt
INPUT SIZE = 3
18/12/31 09:41:51 INFO etl.TestETL: Writing Final output to file: 
/user/sw029693/process_analyzer/output/occurences
18/12/31 09:41:51 INFO impl.FileTargetImpl: Will write output files to new path: /user/sw029693/process_analyzer/output/occurences
18/12/31 09:41:51 INFO etl.TestETL: Cleaning-up TestETL run
18/12/31 09:41:51 INFO etl.TestETL: ETL completed with status 0.

输入文件非常简单,如下所示:

this is line 1
this is line 2
this is line 3

尽管日志记录表明输出位置应该发生了写操作,但我没有看到正在创建的文件。有什么想法吗?

fcwjkofz

fcwjkofz1#

package com.hadoop.crunch;

import java.io.*;
import java.util.Collection;
import java.util.Iterator;

import org.apache.crunch.*;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.*;
import org.apache.log4j.Logger;

public class App extends Configured implements Tool, Serializable{
    private static final long serialVersionUID = 1L;
    private static Logger LOG = Logger.getLogger(App.class);

    @Override
    public int run(String[] args) throws Exception {
        final Path fileSource = new Path(args[0]);
        final Path outFileName = new Path(args[1], "event-" + System.currentTimeMillis() + ".txt");

        //MRPipeline translates the overall pipeline into one or more MapReduce jobs
        Pipeline pipeline = new MRPipeline(App.class, getConf());
        //Specify the input data to the pipeline. 
        //The input data is contained in PCollection
        PCollection<String> inDataPipe = pipeline.read(From.textFile(fileSource));

        //inject an operation into the crunch data pipeline
        PObject<Collection<String>> dataCollection = inDataPipe.asCollection();

        //iterate over the collection 
        Iterator<String> iterator = dataCollection.getValue().iterator();
        FileSystem fs = FileSystem.getLocal(getConf());
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fs.create(outFileName, true)));

        while(iterator.hasNext()){
            String data = iterator.next().toString();
            bufferedWriter.write(data);
            bufferedWriter.newLine();
        }

        bufferedWriter.close();

        //Start the execution of the crunch pipeline, trigger the creation & execution of MR jobs
        PipelineResult result = pipeline.done();

        return result.succeeded() ? 0 : 1;
    }

    public static void main(String[] args) {
        if (args.length != 2)throw new RuntimeException("Usage: hadoop jar <inputPath> <outputPath>");
        try {
            ToolRunner.run(new Configuration(), new App(), args );
        } catch (Exception e) {
            LOG.error(e.getLocalizedMessage());
        }
    }

}

用法:以java程序运行,参数为:第一个参数是输入文件名或目录,第二个参数是输出文件目录。out文件名是事件时间戳,请记住args{0}&args{1}之间只有一个空格/user/sw029693/process\u analyzer/input/input.txt/user/sw029693/process\u analyzer/input/

相关问题