spark流媒体-hbase批量加载

k0pti3hp  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(384)

我目前正在使用python将csv数据批量加载到hbase表中,并且在使用python编写适当的hfiles时遇到了问题 saveAsNewAPIHadoopFile 我的代码目前如下所示:

  1. def csv_to_key_value(row):
  2. cols = row.split(",")
  3. result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
  4. (cols[0], [cols[0], "f2", "c2", cols[2]]),
  5. (cols[0], [cols[0], "f3", "c3", cols[3]]))
  6. return result
  7. def bulk_load(rdd):
  8. conf = {#Ommitted to simplify}
  9. keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
  10. valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
  11. load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
  12. .flatMap(csv_to_key_value)
  13. if not load_rdd.isEmpty():
  14. load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
  15. "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
  16. conf=conf,
  17. keyConverter=keyConv,
  18. valueConverter=valueConv)
  19. else:
  20. print("Nothing to process")

运行此代码时,出现以下错误: java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204) 因为这个错误表明键是问题所在,所以我从rdd中获取了元素,它们如下所示(格式化为可读性)

  1. [(u'1', [u'1', 'f1', 'c1', u'A']),
  2. (u'1', [u'1', 'f2', 'c2', u'1A']),
  3. (u'1', [u'1', 'f3', 'c3', u'10']),
  4. (u'2', [u'2', 'f1', 'c1', u'B']),
  5. (u'2', [u'2', 'f2', 'c2', u'2B']),
  6. (u'2', [u'2', 'f3', 'c3', u'9']),

. . .

  1. (u'9', [u'9', 'f1', 'c1', u'I']),
  2. (u'9', [u'9', 'f2', 'c2', u'3C']),
  3. (u'9', [u'9', 'f3', 'c3', u'2']),
  4. (u'10', [u'10', 'f1', 'c1', u'J']),
  5. (u'10', [u'10', 'f2', 'c2', u'1A']),
  6. (u'10', [u'10', 'f3', 'c3', u'1'])]

这是一个完美的匹配我的csv,在正确的顺序。据我所知,在hbase中,键是由{row,family,timestamp}定义的。行和族的组合是唯一的,对于我的数据中的所有条目都是单调递增的,我无法控制时间戳(这是我能想象的唯一问题)
有谁能就如何避免这种问题给我建议吗?

fykwrbwg

fykwrbwg1#

这只是我的一个愚蠢的错误,我觉得有点愚蠢。按字典顺序,顺序应该是1,10,2,3。。。8, 9. 确保装货前正确订购的最简单方法是:

  1. rdd.sortByKey(true);

我希望我至少能挽救一个人的头痛。

相关问题