pig中的simpletextloader udf

jutyujz0  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(430)

我想为pig udf创建一个自定义加载函数,我已经使用链接创建了一个simpletextloader
https://pig.apache.org/docs/r0.11.0/udf.html ,我已经成功地为这段代码生成jar文件,在pig中注册并运行pig脚本。我不知道如何解决这个问题,任何帮助都将不胜感激。
下面是我的java代码

  1. public class SimpleTextLoader extends LoadFunc{
  2. protected RecordReader in = null;
  3. private byte fieldDel = '\t';
  4. private ArrayList<Object> mProtoTuple = null;
  5. private TupleFactory mTupleFactory = TupleFactory.getInstance();
  6. private static final int BUFFER_SIZE = 1024;
  7. public SimpleTextLoader() {
  8. }
  9. public SimpleTextLoader(String delimiter)
  10. {
  11. this();
  12. if (delimiter.length() == 1) {
  13. this.fieldDel = (byte)delimiter.charAt(0);
  14. } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
  15. switch (delimiter.charAt(1)) {
  16. case 't':
  17. this.fieldDel = (byte)'\t';
  18. break;
  19. case 'x':
  20. fieldDel =
  21. Integer.valueOf(delimiter.substring(2), 16).byteValue();
  22. break;
  23. case 'u':
  24. this.fieldDel =
  25. Integer.valueOf(delimiter.substring(2)).byteValue();
  26. break;
  27. default:
  28. throw new RuntimeException("Unknown delimiter " + delimiter);
  29. }
  30. } else {
  31. throw new RuntimeException("PigStorage delimeter must be a single character");
  32. }
  33. }
  34. private void readField(byte[] buf, int start, int end) {
  35. if (mProtoTuple == null) {
  36. mProtoTuple = new ArrayList<Object>();
  37. }
  38. if (start == end) {
  39. // NULL value
  40. mProtoTuple.add(null);
  41. } else {
  42. mProtoTuple.add(new DataByteArray(buf, start, end));
  43. }
  44. } @Override
  45. public Tuple getNext() throws IOException {
  46. try {
  47. boolean notDone = in.nextKeyValue();
  48. if (notDone) {
  49. return null;
  50. }
  51. Text value = (Text) in.getCurrentValue();
  52. System.out.println("printing value" +value);
  53. byte[] buf = value.getBytes();
  54. int len = value.getLength();
  55. int start = 0;
  56. for (int i = 0; i < len; i++) {
  57. if (buf[i] == fieldDel) {
  58. readField(buf, start, i);
  59. start = i + 1;
  60. }
  61. }
  62. // pick up the last field
  63. readField(buf, start, len);
  64. Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
  65. mProtoTuple = null;
  66. System.out.println(t);
  67. return t;
  68. } catch (InterruptedException e) {
  69. int errCode = 6018;
  70. String errMsg = "Error while reading input";
  71. throw new ExecException(errMsg, errCode,
  72. PigException.REMOTE_ENVIRONMENT, e);
  73. }
  74. }
  75. @Override
  76. public void setLocation(String string, Job job) throws IOException {
  77. FileInputFormat.setInputPaths(job,string);
  78. }
  79. @Override
  80. public InputFormat getInputFormat() throws IOException {
  81. return new TextInputFormat();
  82. }
  83. @Override
  84. public void prepareToRead(RecordReader reader, PigSplit ps) throws IOException {
  85. in=reader;
  86. }
  87. }

下面是我的Pig剧本

  1. REGISTER /home/hadoop/netbeans/sampleloader/dist/sampleloader.jar
  2. a= load '/input.txt' using sampleloader.SimpleTextLoader();
  3. store a into 'output';
ff29svar

ff29svar1#

您正在使用 sampleloader.SimpleTextLoader() 因为它只是一个空的构造函数,所以它没有任何作用。
而是使用 sampleloader.SimpleTextLoader(String delimiter) 执行拆分的实际操作。

相关问题