org.apache.hadoop.mapred.RecordReader类的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(15.1k)|赞(0)|评价(0)|浏览(110)

本文整理了Java中org.apache.hadoop.mapred.RecordReader类的一些代码示例,展示了RecordReader类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader类的具体详情如下:
包路径:org.apache.hadoop.mapred.RecordReader
类名称:RecordReader

RecordReader介绍

[英]RecordReader reads <key, value> pairs from an InputSplit.

RecordReader, typically, converts the byte-oriented view of the input, provided by the InputSplit, and presents a record-oriented view for the Mapper & Reducer tasks for processing. It thus assumes the responsibility of processing record boundaries and presenting the tasks with keys and values.
[中]RecordReader从InputSplit读取<key,value>对。
RecordReader通常会转换InputSplit提供的面向字节的输入视图,并为要处理的Mapper&Reducer任务提供面向记录的视图。因此,它负责处理记录边界,并用键和值表示任务。

代码示例

代码示例来源:origin: apache/incubator-gobblin

/**
 * {@inheritDoc}.
 *
 * This method will throw a {@link ClassCastException} if type {@link #<D>} is not compatible
 * with type {@link #<K>} if keys are supposed to be read, or if it is not compatible with type
 * {@link #<V>} if values are supposed to be read.
 */
@Override
@SuppressWarnings("unchecked")
public D readRecord(@Deprecated D reuse) throws DataRecordException, IOException {
 K key = this.recordReader.createKey();
 V value = this.recordReader.createValue();
 if (this.recordReader.next(key, value)) {
  return this.readKeys ? (D) key : (D) value;
 }
 return null;
}

代码示例来源:origin: apache/hive

protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum)
  throws IOException {
 JobConf localJc = getLocalFSJobConfClone(jc);
 currentSplitPointer = splitNum;
 if ( rr != null ) {
  rr.close();
 }
 // open record reader to read next split
 rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
   reporter);
 currentSplitPointer++;
 return rr;
}

代码示例来源:origin: prestodb/presto

@Override
  public float getProgress()
      throws IOException
  {
    return delegate.getProgress();
  }
}

代码示例来源:origin: apache/hive

PassThruOffsetReader(RecordReader sourceReader) {
 this.sourceReader = sourceReader;
 key = sourceReader.createKey();
 value = (Writable)sourceReader.createValue();
}

代码示例来源:origin: apache/flink

@Override
public void open(HadoopInputSplit split) throws IOException {
  // enforce sequential open() calls
  synchronized (OPEN_MUTEX) {
    this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
    if (this.recordReader instanceof Configurable) {
      ((Configurable) this.recordReader).setConf(jobConf);
    }
    key = this.recordReader.createKey();
    value = this.recordReader.createValue();
    this.fetched = false;
  }
}

代码示例来源:origin: apache/hive

Configuration conf = new Configuration();
OrcOutputFormat of = new OrcOutputFormat();
FileSystem fs = FileSystem.getLocal(conf);
Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs);
fs.delete(root, true);
ObjectInspector inspector;
synchronized (TestOrcFile.class) {
job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
InputSplit[] splits = inf.getSplits(job, 5);
assertEquals(2, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
for(int j = 0; j < splits.length; j++) {
 InputSplit split = splits[j];
 rr = inf.getRecordReader(split, job, Reporter.NULL);
 OrcStruct row = rr.createValue();
 for (int i = 0; i < values[j].length; ++i) {
  System.out.println("Checking " + i);
  String msg = "split[" + j + "] at i=" + i;
  assertEquals(msg, true, rr.next(NullWritable.get(), row));
  assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
 assertEquals(false, rr.next(NullWritable.get(), row));

代码示例来源:origin: ZuInnoTe/hadoopoffice

@Test
public void readExcelInputFormatExcel2003Empty() throws IOException {
  JobConf job = new JobConf(defaultConf);
  ClassLoader classLoader = getClass().getClassLoader();
  String fileName = "excel2003empty.xls";
  String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
  Path file = new Path(fileNameSpreadSheet);
  FileInputFormat.setInputPaths(job, file);
  // set locale to the one of the test data
  job.set("hadoopoffice.locale.bcp47", "de");
  ExcelFileInputFormat format = new ExcelFileInputFormat();
  format.configure(job);
  InputSplit[] inputSplits = format.getSplits(job, 1);
  assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
  RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
  assertNotNull(reader, "Format returned  null RecordReader");
  Text spreadSheetKey = new Text();
  ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
  assertTrue(reader.next(spreadSheetKey, spreadSheetValue), "Input Split for Excel file contains row 1");
  assertEquals(0, spreadSheetValue.get().length, "Input Split for Excel file contain row 1 and is empty");
  assertFalse(reader.next(spreadSheetKey, spreadSheetValue),
      "Input Split for Excel file contains no further row");
}

代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test

@SuppressWarnings("unchecked") // InputFormat instantiation
static long readBench(JobConf conf) throws IOException {
 InputFormat inf = conf.getInputFormat();
 final String fn = conf.get("test.filebench.name", "");
 Path pin = new Path(FileInputFormat.getInputPaths(conf)[0], fn);
 FileStatus in = pin.getFileSystem(conf).getFileStatus(pin);
 RecordReader rr = inf.getRecordReader(new FileSplit(pin, 0, in.getLen(), 
                    (String[])null), conf, Reporter.NULL);
 try {
  Object key = rr.createKey();
  Object val = rr.createValue();
  Date start = new Date();
  while (rr.next(key, val));
  Date end = new Date();
  return end.getTime() - start.getTime();
 } finally {
  rr.close();
 }
}

代码示例来源:origin: ZuInnoTe/hadoopcryptoledger

@Test
public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
 JobConf job = new JobConf(defaultConf);
 ClassLoader classLoader = getClass().getClassLoader();
 String fileName="eth3346406.bin";
 String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
 Path file = new Path(fileNameBlock);
 FileInputFormat.setInputPaths(job, file);
 EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
 format.configure(job);
 InputSplit[] inputSplits = format.getSplits(job,1);

 assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
 assertNotNull( reader,"Format returned  null RecordReader");
 BytesWritable key = new BytesWritable();	
 EthereumBlock block = new EthereumBlock();
 assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
 assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
   assertFalse( reader.next(key,block),"No further blocks in block 3346406");
   reader.close();
 }

代码示例来源:origin: apache/hive

private void writeThenReadByRecordReader(int intervalRecordCount,
  int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)
  throws IOException {
 Path testDir = new Path(System.getProperty("test.tmp.dir", ".")
   + "/mapred/testsmallfirstsplit");
 Path testFile = new Path(testDir, "test_rcfile");
 fs.delete(testFile, true);
 Configuration cloneConf = new Configuration(conf);
 RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
 JobConf jonconf = new JobConf(cloneConf);
 jonconf.set("mapred.input.dir", testDir.toString());
 HiveConf.setLongVar(jonconf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, minSplitSize);
 InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber);
  int previousReadCount = readCount;
  RecordReader rr = inputFormat.getRecordReader(splits[i], jonconf, Reporter.NULL);
  Object key = rr.createKey();
  Object value = rr.createValue();
  while (rr.next(key, value)) {
   readCount++;
  rr.close();
  System.out.println("The " + i + "th split read "
    + (readCount - previousReadCount));

代码示例来源:origin: apache/hive

OutputFormat<?, ?> outFormat = new OrcOutputFormat();
RecordWriter writer =
  outFormat.getRecordWriter(fs, conf, testFilePath.toString(),
    Reporter.NULL);
writer.write(NullWritable.get(),
inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
ColumnProjectionUtils.appendReadColumns(conf, Collections.singletonList(1));
conf.set("columns", "z,r");
conf.set("columns.types", "int:struct<x:int,y:int>");
org.apache.hadoop.mapred.RecordReader reader =
  in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
int rowNum = 0;
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
IntObjectInspector intInspector =
  (IntObjectInspector) fields.get(0).getFieldObjectInspector();
while (reader.next(key, value)) {
 assertEquals(null, inspector.getStructFieldData(value, fields.get(0)));
 Object sub = inspector.getStructFieldData(value, fields.get(1));
reader.close();

代码示例来源:origin: apache/hive

FileSystem fs = dataDir1.getFileSystem(job);
int symbolLinkedFileSize = 0;
symbolLinkedFileSize += fs.getFileStatus(dir1_file1).getLen();
Path dir1_file2 = new Path(dataDir1, "file2");
writeTextFile(dir1_file2,
       "dir1_file2_line1\n" +
       "dir2_file2_line2\n");
symbolLinkedFileSize += fs.getFileStatus(dir2_file2).getLen();
assertEquals(0, cs.getDirectoryCount());
FileInputFormat.setInputPaths(job, symlinkDir);
InputSplit[] splits = inputFormat.getSplits(job, 2);
   inputFormat.getRecordReader(split, job, reporter);
 LongWritable key = reader.createKey();
 Text value = reader.createValue();
 while (reader.next(key, value)) {
  received.add(value.toString());
 reader.close();

代码示例来源:origin: apache/asterixdb

String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
  FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
reader.close();
reader = getRecordReader(currentSplitIndex);
return true;

代码示例来源:origin: com.backtype/dfs-datastores

JobConf conf = new JobConf();
FileInputFormat.addInputPath(conf, new Path(path));
InputSplit[] splits = informat.getSplits(conf, 10000);
assertTrue(splits.length > 3); //want to test that splitting is working b/c i made really big files
for(InputSplit split: splits) {
  RecordReader<Text, BytesWritable> rr = informat.getRecordReader(split, conf, Reporter.NULL);
  Text t = new Text();
  BytesWritable b = new BytesWritable();
  while(rr.next(t, b)) {
    results.put(t.toString(), new String(Utils.getBytes(b)));
  rr.close();

代码示例来源:origin: apache/hive

@Test
public void testVectorizationWithAcid() throws Exception {
 StructObjectInspector inspector = new BigRowInspector();
 JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
   "vectorizationAcid", inspector, true, 1);
 conf.set(ValidTxnList.VALID_TXNS_KEY,
   new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
 Path partDir = new Path(conf.get("mapred.input.dir"));
 OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
   new AcidOutputFormat.Options(conf).maximumWriteId(10)
 Path path = new Path("mock:/vectorizationAcid/p=0/base_0000010/bucket_00000");
 setBlocks(path, conf, new MockBlock("host0", "host1"));
 assertEquals(1, splits.length);
 conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
 NullWritable key = reader.createKey();
 VectorizedRowBatch value = reader.createValue();
 assertEquals(true, reader.next(key, value));
 assertEquals(100, value.count());
 LongColumnVector booleanColumn = (LongColumnVector) value.cols[0];
    timestampColumn.getTime(i));
 assertEquals(false, reader.next(key, value));

代码示例来源:origin: apache/chukwa

private void verifyInputFormatForSequenceFile() {
 try {
  JobConf conf = new JobConf();
  String TMP_DIR = System.getProperty("test.build.data", "/tmp");
  Path filename = new Path("file:///" + TMP_DIR + "/tmpSeqFile");
  SequenceFile.Writer sfw = SequenceFile.createWriter(FileSystem
    .getLocal(conf), conf, filename, ChukwaArchiveKey.class,
    ChunkImpl.class, SequenceFile.CompressionType.NONE, Reporter.NULL);
  long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
  InputSplit split = new FileSplit(filename, 0, len, (String[]) null);
  ChukwaInputFormat in = new ChukwaInputFormat();
    Reporter.NULL);
  LongWritable l = r.createKey();
  Text line = r.createValue();
  for (int i = 0; i < lines.length * 2; ++i) {
   boolean succeeded = r.next(l, line);
   assertTrue(succeeded);
   assertEquals(i, l.get());
   assertEquals(lines[i % lines.length], line.toString());
   System.out.println("read line: " + l.get() + " " + line);
  boolean succeeded = r.next(l, line);
  assertFalse(succeeded);

代码示例来源:origin: apache/hive

ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
  "vectorBuckets", inspector, true, 1);
Path path = new Path(conf.get("mapred.input.dir") + "/0_0");
Writer writer =
  OrcFile.createWriter(path,
conf.setInt(hive_metastoreConstants.BUCKET_COUNT, 3);
HiveInputFormat<?,?> inputFormat =
  new HiveInputFormat<WritableComparable, Writable>();
NullWritable key = reader.createKey();
VectorizedRowBatch value = reader.createValue();
assertEquals(true, reader.next(key, value));
assertEquals(10, value.count());
LongColumnVector col0 = (LongColumnVector) value.cols[0];
 assertEquals("checking " + i, i, col0.vector[i]);
assertEquals(false, reader.next(key, value));

代码示例来源:origin: elephantscale/hadoop-book

TeraInputFormat inFormat = new TeraInputFormat();
TextSampler sampler = new TextSampler();
Text key = new Text();
Text value = new Text();
int partitions = conf.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
int samples = Math.min(10, splits.length);
long recordsPerSample = sampleSize / samples;
  RecordReader<Text, Text> reader =
      inFormat.getRecordReader(splits[sampleStep * i], conf, null);
  while (reader.next(key, value)) {
    sampler.addKey(key);
    records += 1;
FileSystem outFs = partFile.getFileSystem(conf);
if (outFs.exists(partFile)) {
  outFs.delete(partFile, false);

代码示例来源:origin: apache/hive

System.out.println("Files found: ");
for (AcidUtils.ParsedDelta pd : current) {
 System.out.println(pd.getPath().toString());
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
job.set(BUCKET_COUNT, Integer.toString(buckets));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY));
InputSplit[] splits = inf.getSplits(job, buckets);
Assert.assertEquals(numExpectedFiles, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
    inf.getRecordReader(splits[0], job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
for (String record : records) {
 Assert.assertEquals(true, rr.next(key, value));
 Assert.assertEquals(record, value.toString());
Assert.assertEquals(false, rr.next(key, value));

代码示例来源:origin: prestodb/presto

throws Exception
JobConf configuration = new JobConf(new Configuration(false));
configuration.set(READ_COLUMN_IDS_CONF_STR, "0");
configuration.setBoolean(READ_ALL_COLUMNS, false);
RecordReader<K, V> recordReader = inputFormat.getRecordReader(
    new FileSplit(new Path(tempFile.getFile().getAbsolutePath()), 0, tempFile.getFile().length(), (String[]) null),
    configuration,
    NULL);
K key = recordReader.createKey();
V value = recordReader.createValue();
while (recordReader.next(key, value)) {
  Object expectedValue = iterator.next();

相关文章