reduce不会启动

9wbgstp7  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(264)

下面是我使用自定义可写程序实现简单mapreduce作业的代码。

public class MapReduceKMeans {

public static class MapReduceKMeansMapper extends
        Mapper<Object, Text, SongDataPoint, Text> {
    public void map(Object key, Text value, Context context)
            throws InterruptedException, IOException {
        String str = value.toString();
        // Reading Line one by one from the input CSV.
        String split[] = str.split(",");

        String trackId = split[0];
        String title = split[1];
        String artistName = split[2];
        SongDataPoint songDataPoint = 
                new SongDataPoint(new Text(trackId), new Text(title), 
                        new Text(artistName));
        context.write(songDataPoint, new Text());
        }
    }

public static class MapReduceKMeansReducer extends
Reducer<SongDataPoint, Text, Text, NullWritable> {
    public void reduce(SongDataPoint key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        sb.append(key.getTrackId()).append("\t").
        append(key.getTitle()).append("\t")
        .append(key.getArtistName()).append("\t");

        String write = sb.toString();

        context.write(new Text(write), NullWritable.get());
    }

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();

    if (otherArgs.length != 2) {
        System.err
                .println("Usage:<CsV Out Path> <Final Out Path>");
        System.exit(2);
    }

    Job job = new Job(conf, "Song Data Trial");
    job.setJarByClass(MapReduceKMeans.class);
    job.setMapperClass(MapReduceKMeansMapper.class);
    job.setReducerClass(MapReduceKMeansReducer.class);
    job.setOutputKeyClass(SongDataPoint.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

调试时,我的代码读取csv文件中的所有行,但它根本不进入reduce作业。
我还将songdatapoint用作我的自定义可写程序。
其代码如下。

public class SongDataPoint implements WritableComparable<SongDataPoint> {

Text trackId;
Text title;
Text artistName;

public SongDataPoint() {
    this.trackId = new Text();
    this.title = new Text();
    this.artistName = new Text();
}

public SongDataPoint(Text trackId, Text title, Text artistName) {
    this.trackId = trackId;
    this.title = title;
    this.artistName = artistName;
}

@Override
public void readFields(DataInput in) throws IOException {
    this.trackId.readFields(in);
    this.title.readFields(in);
    this.artistName.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {

}

public Text getTrackId() {
    return trackId;
}

public void setTrackId(Text trackId) {
    this.trackId = trackId;
}

public Text getTitle() {
    return title;
}

public void setTitle(Text title) {
    this.title = title;
}

public Text getArtistName() {
    return artistName;
}

public void setArtistName(Text artistName) {
    this.artistName = artistName;
}

@Override
public int compareTo(SongDataPoint o) {
    // TODO Auto-generated method stub
    int compare = getTrackId().compareTo(o.getTrackId());
    return compare;
}

}

感谢您的帮助。谢谢。

ycl3bljg

ycl3bljg1#

CustomWriteable类中的write方法被错误地留空。在编写了正确的代码之后,它解决了这个问题。

public void write(DataOutput out) throws IOException {

}
qvsjd97n

qvsjd97n2#

您还应该指定Map器输出值,如下所示。

job.setMapOutputKeyClass(SongDataPoint.class);
job.setMapOutputValueClass(Text.class);
ix0qys7i

ix0qys7i3#

根据驱动程序,您的输出键类是songdatapoint.class,而输出值类是text.class,但实际上您是将文本作为reducer中的键来编写,并将nullwritable作为reducer中的值来编写。

相关问题