我正在ubuntu 17.04机器上独立运行hbase-1.2.4。我正在尝试用java编写一个map reduce作业,它提取元数据(即列族标识符后跟列标识符)并汇总共享同一模式的记录数。我找到了很多例子,并从中复制了一些代码http://www.informit.com/articles/article.aspx?p=2262143&seqnum=2 这篇文章也处理了一个类似的问题http://sujee.net/2011/04/10/hbase-map-reduce-example/ 似乎很有帮助。我的自定义代码编译并运行,但是我的测试显示reducer没有运行,最后我没有得到任何结果。我附上代码,其中包含一些评论,指出关键的地方,也有一些试验。
我希望,有人能给我一些必要的提示
代码
import java.io.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
//
import java.util.ArrayList;
import java.util.List;
//
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* ermittelt die Anzahl gleich strukturierter Records in einer Tabelle
*
**/
public class MetaSummary {
static class Mapper1 extends TableMapper<Text, IntWritable> {
// wenn moeglich, spaeter Tabellenname als Parameter übergeben (String tableName)
private int numRecords = 0;
private static final IntWritable one = new IntWritable(1);
// eingefuegt MU
private String l_row ="";
private String l_family;
private String l_qualifier;
private String l_out ="";
private byte[] l_bytearray;
private Text l_text;
private String l_mapout ="";
@Override
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// für jeden Record den Aufbau Spaltenfamile:Spalte als String
// zusammengesetzt erzeugen ergibt den Output-Key des Mappers
// Der zugehörige Wert ist jeweils 1.
// Die Werte für gleiche Keys sollen nachher in der Reduce Phase addiert werden
// Aufgabe, dem userKey den String zuweisen, der den Aufbau beschreibt
//
// the user key is composed of the column famliy identifiers along with the respective column names
l_out="";
for(KeyValue kv : values.raw()){
l_family = new String(kv.getFamily());
l_qualifier = new String(kv.getQualifier());
l_out = l_out+l_family+":";
if (l_qualifier == null){ l_qualifier = "<null>"; }
if (l_qualifier.equals("")){ l_qualifier = "<leer>"; }
l_out = l_out +l_qualifier + " ";
}
l_out = l_out.trim();
l_mapout = l_mapout+ l_out + " ";
l_text = new Text(l_mapout);
// following code for test reasons only, to check if this part was running
try (PrintStream out = new PrintStream(new FileOutputStream("mapout.txt"))) {
out.print(l_mapout); }
try {
//context.write(l_userkey, one); // former trials
// context.write(l_out, one);
context.write(l_text, one);
}
catch (InterruptedException e) {
throw new IOException(e);
}
}
}
static class Reducer1 extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>
//public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>
{
//public void reduce(String key, Iterable<IntWritable> values, Context context)
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
// following code for test reasons only, to check if this part was running
try (PrintStream out = new PrintStream(new FileOutputStream("red1Anfang.txt"))) {
out.print("in Reducer1.reduce before for ..."); }
for (IntWritable val : values) {
sum += val.get();
System.out.println(sum);
Put put = new Put(key.getBytes());
// Put put = new Put(Bytes.toBytes(key.toString())); // former trials
// addColumn(byte[] family, byte[] qualifier, byte[] value)
put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("total"), Bytes.toBytes(sum));
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}
// the Reducer1 did not yiels any results, so the next trial was to output into the file systems
// which should be done by Reducer2
// which anyway does not yield any results
static class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable>
{
/*public Reducer2() {
}*/
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// following code for test reasons only, to check if this part was running
try (PrintStream out = new PrintStream(new FileOutputStream("red2Anfang.txt"))) {
out.print("in Reducer2.reduce Anfang"); }
// following code for test reasons only, to check if this part was running
try (PrintStream out = new PrintStream(new FileOutputStream("redlaeuft.txt"))) {
out.print("reduce läuft"); }
String sumstr="";
int sum = 0;
// following code for test reasons only, to check if this part was running
try (PrintStream out = new PrintStream(new FileOutputStream("redoutvorfor.txt"))) {
out.print("in Reducer2.reduce vor Schleife"); }
for (IntWritable val : values) {
sum += val.get();
// the following lines for test reasons only
sumstr = new Integer(sum).toString();
try (PrintStream out = new PrintStream(new FileOutputStream("redout.txt"))) {
out.print(key.getBytes() + " " + sumstr); }
// Write out the key and the sum --- which of the following should do?
// context.write( new ImmutableBytesWritable(key.getBytes()), new IntWritable( sum ) );
//context.write( key, new IntWritable( sum ) );
// Even the simplest output does not work
context.write (new Text("abc"), new IntWritable(1));
}
}
}
public static void main(String[] args) throws Exception {
// HBaseConfiguration conf = new HBaseConfiguration(); // trial 1
Configuration conf = HBaseConfiguration.create();
Path output=new Path("Output");
// Job job = new Job(conf, "HBase_MetaSummary"); // trial 1
Job job = Job.getInstance(conf, "HBase_MetaSummary");
job.setJarByClass(MetaSummary.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("videodaten", scan, Mapper1.class, ImmutableBytesWritable.class,
IntWritable.class, job);
// job.setMapperClass(Mapper1.class); // does not change anything
job.setReducerClass(Reducer2.class);
// the following outcommented lines should have caused the reduce results to be writen to a HBase table
// precondition: a table was created before :: create values'meta_summary', {NAME=>'details',VERSIONS=>1)
//TableMapReduceUtil.initTableReducerJob("meta_summary", Reducer1.class, job);
// instead I try to write into a text file which should do as well
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass( TextOutputFormat.class );
job.setNumReduceTasks( 1 );
FileOutputFormat.setOutputPath(job, output);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
暂无答案!
目前还没有任何答案,快来回答吧!