mysql新api连接

kse8i1jr  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(253)

我试图在hadoop进程中将mysql设置为输入。如何在版本1.0.3中为hadoop-mysql连接使用dbinputformat类?通过hadoop-1.0.3/docs/api/中的jobconf配置作业不起作用。

// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

FileInputFormat.setInputPaths(job, new Path("in"));
FileOutputFormat.setOutputPath(job, new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setCombinerClass(MyJob.MyReducer.class);
job.setReducerClass(MyJob.MyReducer.class);

job.setInputFormat(SequenceFileInputFormat.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
atmip9wb

atmip9wb1#

看看这个帖子。它展示了如何将数据从map reduce接收到mysql数据库。

t30tvxxf

t30tvxxf2#

您需要执行以下操作(以典型的employee表为例):

JobConf conf = new JobConf(getConf(), MyDriver.class);
    conf.setInputFormat(DBInputFormat.class); 
    DBConfiguration.configureDB(conf, “com.mysql.jdbc.Driver”, “jdbc:mysql://localhost/mydatabase”); String [] fields = { “employee_id”, "name" };
    DBInputFormat.setInput(conf, MyRecord.class, “employees”, null /* conditions */, “employee_id”, fields); 
    ...
    // other necessary configuration
    JobClient.runJob(conf);

这个 configureDB() 以及 setInput() 调用配置 DBInputFormat . 第一个调用指定要使用的jdbc驱动程序实现以及要连接到哪个数据库。第二个调用指定要从数据库加载的数据。myrecord类是java中读取数据的类,“employees”是要读取的表的名称。“employee\u id”参数指定表的主键,用于对结果排序。下面的“输入格式的限制”一节解释了为什么这是必要的。最后,fields数组列出了要读取的表的哪些列。重载的定义 setInput() 允许您指定要读取的任意sql查询。
打电话后 configureDB() 以及 setInput() ,您应该像往常一样配置作业的其余部分,设置mapper和reducer类,指定要从中读取的任何其他数据源(例如hdfs中的数据集)和其他特定于作业的参数。
您需要创建自己的 Writable -类似于以下内容(将id和name视为表字段):

class MyRecord implements Writable, DBWritable { 
    long id; 
    String name; 

    public void readFields(DataInput in) throws IOException { 
        this.id = in.readLong(); 
        this.name = Text.readString(in); 
        } 

    public void readFields(ResultSet resultSet) throws SQLException { 
        this.id = resultSet.getLong(1); 
        this.name = resultSet.getString(2); } 

    public void write(DataOutput out) throws IOException { 
        out.writeLong(this.id); 
        Text.writeString(out, this.name); } 

    public void write(PreparedStatement stmt) throws SQLException { 
        stmt.setLong(1, this.id); 
        stmt.setString(2, this.name); } 
    }

然后,Map器接收dbwritable实现的一个示例作为其输入值。输入键是数据库提供的行id;您很可能会放弃此值。

public class MyMapper extends MapReduceBase implements Mapper<LongWritable, MyRecord, LongWritable, Text> { 
public void map(LongWritable key, MyRecord val, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { 
// Use val.id, val.name here 
output.collect(new LongWritable(val.id), new Text(val.name)); 
} 
}

更多信息:请阅读以下链接(我答案的实际来源):http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/

相关问题