public class Main
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf,
"com.mysql.jdbc.Driver", // driver class
"jdbc:mysql://localhost:3306/testDb", // db url
"user", // username
"password"); //password
Job job = new Job(conf);
job.setJarByClass(Main.class);
job.setMapperClass(Map.class); // your mapper - not shown in this example
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class); // your mapper - not shown in this example
job.setMapOutputValueClass(IntWritable.class); // your mapper - not shown in this example
job.setOutputKeyClass(DBOutputWritable.class); // reducer's KEYOUT
job.setOutputValueClass(NullWritable.class); // reducer's VALUEOUT
job.setInputFormatClass(...);
job.setOutputFormatClass(DBOutputFormat.class);
DBInputFormat.setInput(...);
DBOutputFormat.setOutput(
job,
"output", // output table name
new String[] { "name", "count" } //table columns
);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1条答案
按热度按时间pkln4tw61#
这个博客上展示了一个很好的例子,我试过了,效果非常好。我引用了代码中最重要的部分。
首先,必须创建一个表示要存储的数据的类。类必须实现dbwritable接口:
在减速器中创建以前定义的类的对象:
最后,必须配置到db的连接(不要忘记在类路径上添加db连接器),并注册Map器和reducer的输入/输出数据类型。