hadoop textoutputformat:向csv输出添加标题

dgiusagp  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(281)

我正在维护一个简单的hadoop作业,它在hdfs中生成csv文件作为输出。作业使用textoutputformat。我想将前导标题行添加到csv文件中(我知道零件文件是由不同的worker创建的,如果每个worker都获得标题,这不是问题)。如何做到这一点?
编辑:级联可以帮助,但乍一看,我不想开始使用一个新的框架
编辑:
所以我想为输出csv文件添加标题。列数是确定的。这是我的减速机类的 backbone :

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public final class Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
    private MultipleOutputs<Text, IntWritable> mos;

    private static final Text KEY_HOLDER = new Text();

    private static final IntWritable VALUE_HOLDER = new IntWritable(1);

    @Override
    public void setup(final Context context)
    {
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }

    @Override
    public void cleanup(final Context context) throws IOException, InterruptedException
    {
        mos.close();
    }

    @Override
    public void reduce(final Text key, final Iterable<IntWritable> values, final Context context)
            throws IOException, InterruptedException
    {
        // [... some business logic ...]        
        mos.write(KEY_HOLDER, VALUE_HOLDER, "myFileName");
        context.progress();
    }
}
jv4diomz

jv4diomz1#

您可以重写mapper/reducer类中的run(),以便根据您的要求添加标头。例如,如果您想在最终o/p中添加FirstName和lastname,可以使用下面的代码作为参考。

public void run(Context context) throws IOException, InterruptedException
  {
        setup(context);
        column = new Text("ColumnName") ;
        values = new Text("FirstName" + "\t" + "LastName") ;
        context.write(column, values);
        try
        {
          while (context.nextKey())
          {
            reduce(context.getCurrentKey(), context.getValues(), context);
            Iterator<IntWritable> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator)
            {              ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();        
            }
          }
        }
        finally
        {
          cleanup(context);
        }
  }

相关问题