java在hadoop中用hdfs保存json数据

kmb7vmvb  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(645)

我有以下课程

public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        JSONObject jsn = new JSONObject();

        for (Text value : values) {
            String[] vals = value.toString().split("\t");
            String[] targetNodes = vals[0].toString().split(",",-1);
            jsn.put("source",vals[1] );
            jsn.put("target",targetNodes);

        }
        // context.write(key, new Text(sum));
    }
}

通过示例(这里的免责声明:newbie),我可以看到一般输出类型似乎类似于键/值存储。
但是如果输出中没有任何键怎么办?或者如果我的输出是其他格式的(在我的例子中是json),我想怎么办?
总之,从上面的代码:我想写 json 反对hdfs?
它在hadoop流媒体中非常微不足道。。但在hadoopjava中如何实现呢?

ghhaqwfi

ghhaqwfi1#

您可以使用hadoop的outputformat接口来创建自定义格式,这些格式将按照您的意愿写入数据。例如,如果需要将数据写入json对象,则可以执行以下操作:

public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(
            TaskAttemptContext context) throws IOException, 
                  InterruptedException {
        Configuration conf = context.getConfiguration();
        Path path = getOutputPath(context);
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = 
                fs.create(new Path(path,context.getJobName()));
        return new JsonRecordWriter(out);
    }

    private static class JsonRecordWriter extends 
          LineRecordWriter<Text,IntWritable>{
        boolean firstRecord = true;
        @Override
        public synchronized void close(TaskAttemptContext context)
                throws IOException {
            out.writeChar('{');
            super.close(null);
        }

        @Override
        public synchronized void write(Text key, IntWritable value)
                throws IOException {
            if (!firstRecord){
                out.writeChars(",\r\n");
                firstRecord = false;
            }
            out.writeChars("\"" + key.toString() + "\":\""+
                    value.toString()+"\"");
        }

        public JsonRecordWriter(DataOutputStream out) 
                throws IOException{
            super(out);
            out.writeChar('}');
        }
    }
}

如果您不想在输出中包含键,只需发出null,如:

context.write(NullWritable.get(), new IntWritable(sum));

hth公司

2o7dmzc5

2o7dmzc52#

如果您只想将json对象的列表写入hdfs而不关心key/value的概念,那么可以使用 NullWritable 在你的 Reducer 输出值:

public static class TokenCounterReducer extends Reducer<Text, Text, Text, NullWritable> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            JSONObject jsn = new JSONObject();
            ....
            context.write(new Text(jsn.toString()), null);
        }
    }
}

请注意,您需要更改作业配置以执行以下操作:

job.setOutputValueClass(NullWritable.class);

通过将json对象写入hdfs,我了解到您希望存储json的字符串表示形式,我在上面描述了这一点。如果要将json的二进制表示形式存储到hdfs中,则需要使用 SequenceFile . 显然你可以自己写 Writable 但是我觉得如果你想用一个简单的字符串表示的话,这样更容易。

相关问题