我正在编写一个hadoop作业,它使用cassandra(v2.0.11)作为输入和输出。
在我的hadoop工作中,我定义了输入列族:
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, INPUT_COLUMN_FAMILY, WIDE_ROWS);
哪里 WIDE_ROWS=true
. 我还设置了 CqlInputFormat
作为阅读课:
job.setInputFormatClass(CqlInputFormat.class);
``` `CqlInputFormat` 使用 `CqlRecordReader` 写在哪里(链接):
// Because the old Hadoop API wants us to write to the key and value
// and the new asks for them, we need to copy the output of the new API
// to the old. Thus, expect a small performance hit.
// And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat
// and ColumnFamilyRecordReader don't support them, it should be fine for now.
public boolean next(Long key, Row value) throws IOException
{
if (nextKeyValue())
{
((WrappedRow)value).setRow(getCurrentValue());
return true;
}
return false;
}
我完全不明白。。。当我检查的时候 `ColumnFamilyRecordReader` 代码(链接)似乎是使用宽行。。。
做 `CqlInputFormat` 真的支持宽行吗?你能解释一下吗?
1条答案
按热度按时间mznpcxlj1#
我对此进行了研究,并意识到cql“转置”了宽行,以便将每一列分别送入map函数(
CqlInputFormat
运行cql查询以从cassandra节点(node)获取数据。这种方法在处理非常宽的行时不会导致oom异常,因为
CqlInputFormat
使用cql中可用的分页机制。只需要CqlConfigHelper.getInputCQLPageRowSize
每页列数。不幸的是,在我的例子中,这是没有效率的,因为我想在每个行键上执行“groupby”操作来计算列数。在一个由数千列组成的循环中增加一个计数器的速度比只增加一个要慢
columns.size()
(如果有这种可能性的话)。更多信息:
http://www.datastax.com/dev/blog/cql3-for-cassandra-expertshttpshttp://issues.apache.org/jira/browse/cassandra-3264