我正在尝试使用以下代码在hbase中存储数据:
val x = Seq((1, ("a", "A")), (1, ("a", "AA")), (2, ("d", "D")), (2, ("d", "DD")))
val f = sc.parallelize(x)
val z = f.groupByKey()
z.collectAsMap().foreach(elem => {
var rowKey = elem._1.toString()
var p = new Put(rowKey.getBytes())
elem._2.foreach(innerElem => {
var col = innerElem._1
var value = innerElem._2
p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
table.put(p)
})
})
table.flushCommits()
我从hbase shell获得以下输出:
ROW COLUMN+CELL
1 column=cf:a, timestamp=1487917201238, value=AA
2 column=cf:d, timestamp=1487917201226, value=DD
但我想得到:
ROW COLUMN+CELL
1 column=cf:a, timestamp=1487917201238, value=A
1 column=cf:a, timestamp=148791720123X, value=AA
2 column=cf:d, timestamp=1487917201226, value=D
2 column=cf:d, timestamp=148791720122X, value=DD
我的代码重写了特定列的第一个值,并只存储该列的最后一个值。我要存储特定列的每个值。
当我搬家的时候 var p = new Put(rowKey.getBytes())
在scala shell中的第二个foreach循环中,我得到了以下内容:
scala>
| z.collectAsMap().foreach(elem => {
| val rowKey = elem._1.toString()
|
| elem._2.foreach(innerElem => {
| Display all 620 possibilities? (y or n)
| ew Put(rowKey.getBytes())
| var col = innerElem._1
| var value = innerElem._2
| p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
| table.put(p)
| })
| })
<console>:49: error: not found: value ew
ew Put(rowKey.getBytes())
^
<console>:52: error: not found: value p
p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
^
<console>:53: error: not found: value p
table.put(p)
^
1条答案
按热度按时间f8rj6qna1#
你的问题在于
Put
对象。把它看作一个哈希表。只需重写第二个forloop
迭代。创建单独的Put
每次迭代中的对象,它会很好。