读取由于mapreduce中的/n而分解为两行的记录

wfsdck30  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(324)

我正在尝试编写一个自定义读取器,用于读取具有定义字段数的记录(位于两行中)。
例如

  1. 1,2,3,4("," can be there or not)
  2. ,5,6,7,8

我的要求是读取记录并将其作为单个记录推入mapper,如 {1,2,3,4,5,6,7,8} . 请提供一些信息。
更新:

  1. public boolean nextKeyValue() throws IOException, InterruptedException {
  2. if(key == null) {
  3. key = new LongWritable();
  4. }
  5. //Current offset is the key
  6. key.set(pos);
  7. if(value == null) {
  8. value = new Text();
  9. }
  10. int newSize = 0;
  11. int numFields = 0;
  12. Text temp = new Text();
  13. boolean firstRead = true;
  14. while(numFields < reqFields) {
  15. while(pos < end) {
  16. //Read up to the '\n' character and store it in 'temp'
  17. newSize = in.readLine( temp,
  18. maxLineLength,
  19. Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
  20. maxLineLength));
  21. //If 0 bytes were read, then we are at the end of the split
  22. if(newSize == 0) {
  23. break;
  24. }
  25. //Otherwise update 'pos' with the number of bytes read
  26. pos += newSize;
  27. //If the line is not too long, check number of fields
  28. if(newSize < maxLineLength) {
  29. break;
  30. }
  31. //Line too long, try again
  32. LOG.info("Skipped line of size " + newSize + " at pos " +
  33. (pos - newSize));
  34. }
  35. //Exit, since we're at the end of split
  36. if(newSize == 0) {
  37. break;
  38. }
  39. else {
  40. String record = temp.toString();
  41. StringTokenizer fields = new StringTokenizer(record,"|");
  42. numFields += fields.countTokens();
  43. //Reset 'value' if this is the first append
  44. if(firstRead) {
  45. value = new Text();
  46. firstRead = false;
  47. }
  48. if(numFields != reqFields) {
  49. value.append(temp.getBytes(), 0, temp.getLength());
  50. }
  51. else {
  52. value.append(temp.getBytes(), 0, temp.getLength());
  53. }
  54. }
  55. }
  56. if(newSize == 0) {
  57. key = null;
  58. value = null;
  59. return false;
  60. }
  61. else {
  62. return true;
  63. }
  64. }

}
这是我正在尝试的nextkeyvalue方法。但是Map器仍然没有得到正确的值。reqfields是4。

fafcakar

fafcakar1#

看看textinputformat是如何实现的。看看它的超类,还有fileinputformat。必须将textinputformat子类化为fileinputformat,并实现自己的记录处理。
在实现任何类型的文件输入格式时需要注意的是:
framework将分割文件,并给出要读取的文件片段的起始偏移量和字节长度。它很可能会将文件直接拆分到某个记录上。这就是为什么如果拆分中没有完全包含该记录,则读取器必须在拆分开始时跳过该记录的字节;如果拆分中没有完全包含该记录,则读取器必须读入拆分的最后一个字节以读取整个最后一条记录。
例如,textinoutformat将\n字符视为记录分隔符,因此当它获得拆分时,它将跳过字节直到第一个\n字符,并在拆分结束后读取到第\n个字符。
至于代码示例:
你需要问自己以下问题:假设你打开文件,找到一个随机的位置,开始向前阅读。如何检测记录的开始?我在您的代码中看不到任何与此相关的内容,没有它,您就无法编写好的输入格式,因为您不知道记录边界是什么。
现在仍然可以通过使issplittable(jobcontext,path)方法返回false,使输入格式从端到端读取整个文件。这使得文件完全由单个map任务读取,从而降低了并行性。
您的内部while循环似乎有问题,因为它正在检查过长的行并跳过它们。假设您的记录是使用多行写入的,那么在读取时可能会合并一条记录的一部分和另一条记录的另一部分。

6qftjkof

6qftjkof2#

字符串必须使用stringtokenizer进行标记化,而不是拆分。代码已更新为新的实现。

相关问题