hadoop的putmerge不起作用

f4t66c6m  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(439)

我试图从eclipse编译hadoop的putmerge程序,但是它没有工作,而是在hdfs上创建了一个文件,而是在我的localmachine上创建了这个文件(我在那里安装了eclipse)。
看起来我的conf没有从xml文件中选择正确的配置。
编辑:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PlayWithHadoop {

public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
//conf.set("fs.default.name", "hdfs://localhost:54310/user/hduser");
//conf.set("dfs.data.dir", "/user/hduser");
conf.addResource(new Path("/home/hduser/hadoop/conf/core-site.xml"));
conf.addResource(new Path("/home/hduser/hadoop/conf/hdfs-site.xml"));

FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]);
Path hdfsFile = new Path(args[1]);

try {
//hdfs.setWorkingDirectory(new Path("/user/hduser/hadoop"));
FileStatus[] inputFiles = local.listStatus(inputDir);
FSDataOutputStream out = hdfs.create(hdfsFile);

for (int i=0; i<inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
FSDataInputStream in =local.open(inputFiles[i].getPath());
System.out.println();
System.out.println(hdfs.getWorkingDirectory().toString());

byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
rkue9o1l

rkue9o1l1#

//请尝试下面的代码,并确保为jar正确地提供输入、输出路径路径。

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Progressable;

/**
 *
 * @author hadoop1
 */
public class PutMerge {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException {

        //Configuration would be taking the resource that contains set of name/value pairs as XML
        //Since Configuration automatically loads the properties from core-site.xml in its constructor, and core-site.xml should contain fs.defaultFS,
        Configuration conf = new Configuration();

        //FIleSystem Abstract class is used to get the conf key to connect to HDFS
        //FileSystem needs only one configuration key to successfully connect to HDFS. Previously it was fs.default.name. 
        //From yarn onward it's changed to fs.defaultFS. So the following snippet is sufficient for the connection. 
        FileSystem hdfs = FileSystem.get(conf);
        //For local connection l
        FileSystem local = FileSystem.getLocal(conf);

        Path inputDirectory = new Path(args[0]);
        Path hdfsFiles = new Path(args[1]);

        try {

            FileStatus[] inputFiles = local.listStatus(inputDirectory);
            FSDataOutputStream out = hdfs.create(hdfsFiles);

            for (int i = 0; i < inputFiles.length; i++) {
                //System.out.println(inputFiles[i].getPath().getName());

                FSDataInputStream in = local.open(inputFiles[i].getPath());
                byte buffer[] = new byte[256];
                int byteRead = 0;

                while ((byteRead = in.read(buffer)) > 0) {
                    out.write(buffer, 0, byteRead);
                }
                in.close();
            }
            out.close();
        } catch (Exception e) {
            System.out.print("We have an error");
        }

        Job job = Job.getInstance(conf, "PutMerge");
        job.setJarByClass(PutMerge.class);
        job.setMapperClass(PutMergeMapper.class);
        job.setCombinerClass(PutMergeReducer.class);
        job.setReducerClass(PutMergeReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        // HDFS path will contain the merged files
        FileInputFormat.addInputPath(job, hdfsFiles);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        try {

            // For the job to wait unless one part of job is over
            System.exit(job.waitForCompletion(true)? 0 : 1);
            //To take the input path of multiple csvs
        } catch (InterruptedException ex) {
            System.out.print("Error 1 in main file %^#%$^&*(&&$%^&*(*^&%");
        }
    }
}
xam8gpfp

xam8gpfp2#

在代码中添加这些行,看看是否有效:

Configuration conf = new Configuration();
conf.addResource(new Path("/HADOOP_HOME/conf/core-site.xml"));
conf.addResource(new Path("/HADOOP_HOME/conf/hdfs-site.xml"));

相关问题