wordcount与apache crunch集成到hbase独立版

yqyhoc1h  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(469)

目前我正在评估apachecrunch。我遵循了一个简单的wordcount mapreduce作业示例:之后我尝试将结果保存到一个独立的hbase中。hbase正在运行(使用jps和hbase shell检查),如下所述:http://hbase.apache.org/book/quickstart.html
现在我采用这个例子来编写hbase:

Pipeline pipeline = new MRPipeline(WordCount.class,getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PTable<String,Long> counts = noStopWords.count();
pipeline.write(counts, new HBaseTarget("wordCountOutTable");
PipelineResult result = pipeline.done();

我得到一个例外:“exception:java.lang.illegalargumentexception:hbasetarget仅支持put和delete“
有什么线索吗?

am46iovg

am46iovg1#

ptable可能是pcollection,但hbasetarget只能处理put或delete对象。因此,必须将ptable转换为pcollection,其中集合的每个元素都是put或delete。看一看这些关键的例子。
转换示例如下所示:

public PCollection<Put> createPut(final PTable<String, String> counts) {
   return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
     @Override
     public void process(final Pair<String, String> input, final Emitter<Put> emitter) {
       Put put;
       // input.first is used as row key
       put = new Put(Bytes.toBytes(input.first())); 
       // the value (input.second) is added with its family and qualifier
       put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); 
       emitter.emit(put);
     }
   }, Writables.writables(Put.class));
 }

相关问题