我和Cassandra合作的经验不多,如果我的方法不对,请原谅。
我正在尝试用map reduce在cassandra进行批量加载
基本上是字数计算的例子
参考文献:http://henning.kropponline.de/2012/11/15/using-cassandra-hadoopbulkoutputformat/
我已经把简单的hadoop wordcountMap器的例子,并稍微修改了驱动程序代码和减速机根据上述例子。
我也成功地生成了输出文件。现在我的疑问是如何执行Cassandra部分加载?我的方法有什么不同吗?
请给我建议。
这是驱动程序代码的一部分
Job job = new Job();
job.setJobName(getClass().getName());
job.setJarByClass(CassaWordCountJob.class);
Configuration conf = job.getConfiguration();
conf.set("cassandra.output.keyspace", "test");
conf.set("cassandra.output.columnfamily", "words");
conf.set("cassandra.output.partitioner.class", "org.apache.cassandra.dht.RandomPartitioner");
conf.set("cassandra.output.thrift.port","9160"); // default
conf.set("cassandra.output.thrift.address", "localhost");
conf.set("mapreduce.output.bulkoutputformat.streamthrottlembits", "400");
job.setMapperClass(CassaWordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(CassaWordCountReducer.class);
FileOutputFormat.setOutputPath(job, new Path("/home/user/Desktop/test/cassandra"));
MultipleOutputs.addNamedOutput(job, "reducer", BulkOutputFormat.class, ByteBuffer.class, List.class);
return job.waitForCompletion(true) ? 0 : 1;
Map器与普通的wordcountMap器相同,它只是标记并发出word,1
reducer类的形式如下
public class CassaWordCountReducer extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Mutation> columnsToAdd = new ArrayList<Mutation>();
Integer wordCount = 0;
for(IntWritable value : values) {
wordCount += value.get();
}
Column countCol = new Column(ByteBuffer.wrap("count".getBytes()));
countCol.setValue(ByteBuffer.wrap(wordCount.toString().getBytes()));
countCol.setTimestamp(new Date().getTime());
ColumnOrSuperColumn wordCosc = new ColumnOrSuperColumn();
wordCosc.setColumn(countCol);
Mutation countMut = new Mutation();
countMut.column_or_supercolumn = wordCosc;
columnsToAdd.add(countMut);
context.write(ByteBuffer.wrap(key.toString().getBytes()), columnsToAdd);
}
}
1条答案
按热度按时间4dc9hkyq1#
要批量装载到cassandra,我建议您从datastax看这篇文章。基本上你需要做两件事批量装载:
您的输出数据本机不适合cassandra,您需要将其转换为sstables。
一旦你有了sstables,你就需要能够将它们流到cassandra中。当然,您不只是想将每个sstable复制到每个节点,您只想将数据的相关部分复制到每个节点
在您使用
BulkOutputFormat
,当它使用sstableloader
在幕后。我从来没用过MultipleOutputs
,但它应该可以正常工作。我认为你的错误在于你没有使用
MultipleOutputs
正确:你仍然在做一个context.write
,当你真的应该写信给你的MultipleOutputs
对象。你现在这样做,因为你是在给普通人写信Context
,它将以默认的输出格式TextOutputFormat
而不是你在书中定义的那个MultipleOutputs
. 有关如何使用MultipleOutputs
在你的减速机里。一旦写入到正确的输出格式
BulkOutputFormat
正如您所定义的,您的sstables应该从集群中的每个节点创建并流式传输到cassandra—您不需要任何额外的步骤,输出格式将为您处理它。另外,我建议看这篇文章,他们也解释了如何使用
BulkOutputFormat
,但他们用的是ConfigHelper
您可能需要查看它,以便更轻松地配置cassandra端点。