hadoophdfsmapreduce输出到mongodb

3wabscal  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(473)

我想编写一个java程序,从hdfs读取输入,使用mapreduce进行处理,并将输出写入mongodb。
以下是场景:
我有一个hadoop集群,它有3个数据节点。
java程序从hdfs读取输入,并使用mapreduce进行处理。
最后,将结果写入mongodb。
实际上,从hdfs读取并用mapreduce处理它是很简单的。但是我在把结果写进mongodb的时候遇到了麻烦。是否支持java api将结果写入mongodb?另一个问题是,由于它是一个hadoop集群,因此我们不知道哪个datanode将运行reducer任务并生成结果,是否可以将结果写入安装在特定服务器上的mongodb中?
如果我想将结果写入hdfs,代码如下:

@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
{
    long sum = 0;
    for (LongWritable value : values) 
    {
        sum += value.get();
    }

    context.write(new Text(key), new LongWritable(sum));
}

现在我想把结果写进mongodb而不是hdfs,我该怎么做呢?

lc8prwob

lc8prwob1#

对。你照常给蒙哥写信。事实上,mongodb被设置为在shard上运行,这是一个隐藏的细节。

j8yoct9x

j8yoct9x2#

我花了一上午的时间来实现同样的场景。我的解决方案是:
创建三个类:
experience.java:用于作业配置和提交
mymap.java:Map器类
myreduce.java:reducer类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoOutputFormat;

public class Experiment extends Configured implements Tool{

     public int run(final String[] args) throws Exception {
        final Configuration conf = getConf();
        conf.set("mongo.output.uri", args[1]);

        final JobConf job = new JobConf(conf);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        job.setJarByClass(Experiment.class);

        job.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        job.setOutputFormat(MongoOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BSONWritable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        JobClient.runJob(job);

        return 0;
    }

    public static void main(final String[] args) throws Exception{

        int res = ToolRunner.run(new TweetPerUserToMongo(), args);
        System.exit(res);
    }
}

从集群运行实验类时,将输入两个参数。第一个参数是来自hdfs位置的输入源,第二个参数是用于保存结果的mongodburi。下面是一个示例调用。假设您的experience.java位于包名org.example下。

sudo -u hdfs hadoop jar ~/jar/myexample.jar org.example.Experiment myfilesinhdfs/* mongodb://192.168.0.1:27017/mydbName.myCollectionName

这也许不是最好的方法,但对我来说确实有用。

uelo1irk

uelo1irk3#

你想要什么«用于hadoop的mongodb连接器». 举例说明。
只是在reducer中添加代码,作为副作用,将数据插入到数据库中是很诱人的。避免这种诱惑。使用连接器而不是仅仅插入数据作为reducer类的副作用的一个原因是推测性执行:hadoop有时可以并行运行两个完全相同的reduce任务,这可能导致无关的插入和重复数据。

相关问题