本文整理了Java中org.apache.hadoop.mapred.RecordReader
类的一些代码示例,展示了RecordReader
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader
类的具体详情如下:
包路径:org.apache.hadoop.mapred.RecordReader
类名称:RecordReader
[英]RecordReader
reads <key, value> pairs from an InputSplit.
RecordReader
, typically, converts the byte-oriented view of the input, provided by the InputSplit
, and presents a record-oriented view for the Mapper & Reducer tasks for processing. It thus assumes the responsibility of processing record boundaries and presenting the tasks with keys and values.
[中]RecordReader
从InputSplit读取<key,value>对。RecordReader
通常会转换InputSplit
提供的面向字节的输入视图,并为要处理的Mapper&Reducer任务提供面向记录的视图。因此,它负责处理记录边界,并用键和值表示任务。
代码示例来源:origin: apache/incubator-gobblin
/**
* {@inheritDoc}.
*
* This method will throw a {@link ClassCastException} if type {@link #<D>} is not compatible
* with type {@link #<K>} if keys are supposed to be read, or if it is not compatible with type
* {@link #<V>} if values are supposed to be read.
*/
@Override
@SuppressWarnings("unchecked")
public D readRecord(@Deprecated D reuse) throws DataRecordException, IOException {
K key = this.recordReader.createKey();
V value = this.recordReader.createValue();
if (this.recordReader.next(key, value)) {
return this.readKeys ? (D) key : (D) value;
}
return null;
}
代码示例来源:origin: apache/hive
protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum)
throws IOException {
JobConf localJc = getLocalFSJobConfClone(jc);
currentSplitPointer = splitNum;
if ( rr != null ) {
rr.close();
}
// open record reader to read next split
rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
reporter);
currentSplitPointer++;
return rr;
}
代码示例来源:origin: prestodb/presto
@Override
public float getProgress()
throws IOException
{
return delegate.getProgress();
}
}
代码示例来源:origin: apache/hive
PassThruOffsetReader(RecordReader sourceReader) {
this.sourceReader = sourceReader;
key = sourceReader.createKey();
value = (Writable)sourceReader.createValue();
}
代码示例来源:origin: apache/flink
@Override
public void open(HadoopInputSplit split) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
if (this.recordReader instanceof Configurable) {
((Configurable) this.recordReader).setConf(jobConf);
}
key = this.recordReader.createKey();
value = this.recordReader.createValue();
this.fetched = false;
}
}
代码示例来源:origin: apache/hive
Configuration conf = new Configuration();
OrcOutputFormat of = new OrcOutputFormat();
FileSystem fs = FileSystem.getLocal(conf);
Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs);
fs.delete(root, true);
ObjectInspector inspector;
synchronized (TestOrcFile.class) {
job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
InputSplit[] splits = inf.getSplits(job, 5);
assertEquals(2, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr;
for(int j = 0; j < splits.length; j++) {
InputSplit split = splits[j];
rr = inf.getRecordReader(split, job, Reporter.NULL);
OrcStruct row = rr.createValue();
for (int i = 0; i < values[j].length; ++i) {
System.out.println("Checking " + i);
String msg = "split[" + j + "] at i=" + i;
assertEquals(msg, true, rr.next(NullWritable.get(), row));
assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
assertEquals(false, rr.next(NullWritable.get(), row));
代码示例来源:origin: ZuInnoTe/hadoopoffice
@Test
public void readExcelInputFormatExcel2003Empty() throws IOException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName = "excel2003empty.xls";
String fileNameSpreadSheet = classLoader.getResource(fileName).getFile();
Path file = new Path(fileNameSpreadSheet);
FileInputFormat.setInputPaths(job, file);
// set locale to the one of the test data
job.set("hadoopoffice.locale.bcp47", "de");
ExcelFileInputFormat format = new ExcelFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job, 1);
assertEquals(1, inputSplits.length, "Only one split generated for Excel file");
RecordReader<Text, ArrayWritable> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull(reader, "Format returned null RecordReader");
Text spreadSheetKey = new Text();
ArrayWritable spreadSheetValue = new ArrayWritable(SpreadSheetCellDAO.class);
assertTrue(reader.next(spreadSheetKey, spreadSheetValue), "Input Split for Excel file contains row 1");
assertEquals(0, spreadSheetValue.get().length, "Input Split for Excel file contain row 1 and is empty");
assertFalse(reader.next(spreadSheetKey, spreadSheetValue),
"Input Split for Excel file contains no further row");
}
代码示例来源:origin: org.apache.hadoop/hadoop-mapred-test
@SuppressWarnings("unchecked") // InputFormat instantiation
static long readBench(JobConf conf) throws IOException {
InputFormat inf = conf.getInputFormat();
final String fn = conf.get("test.filebench.name", "");
Path pin = new Path(FileInputFormat.getInputPaths(conf)[0], fn);
FileStatus in = pin.getFileSystem(conf).getFileStatus(pin);
RecordReader rr = inf.getRecordReader(new FileSplit(pin, 0, in.getLen(),
(String[])null), conf, Reporter.NULL);
try {
Object key = rr.createKey();
Object val = rr.createValue();
Date start = new Date();
while (rr.next(key, val));
Date end = new Date();
return end.getTime() - start.getTime();
} finally {
rr.close();
}
}
代码示例来源:origin: ZuInnoTe/hadoopcryptoledger
@Test
public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 3346406");
reader.close();
}
代码示例来源:origin: apache/hive
private void writeThenReadByRecordReader(int intervalRecordCount,
int writeCount, int splitNumber, long minSplitSize, CompressionCodec codec)
throws IOException {
Path testDir = new Path(System.getProperty("test.tmp.dir", ".")
+ "/mapred/testsmallfirstsplit");
Path testFile = new Path(testDir, "test_rcfile");
fs.delete(testFile, true);
Configuration cloneConf = new Configuration(conf);
RCFileOutputFormat.setColumnNumber(cloneConf, bytesArray.length);
JobConf jonconf = new JobConf(cloneConf);
jonconf.set("mapred.input.dir", testDir.toString());
HiveConf.setLongVar(jonconf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, minSplitSize);
InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber);
int previousReadCount = readCount;
RecordReader rr = inputFormat.getRecordReader(splits[i], jonconf, Reporter.NULL);
Object key = rr.createKey();
Object value = rr.createValue();
while (rr.next(key, value)) {
readCount++;
rr.close();
System.out.println("The " + i + "th split read "
+ (readCount - previousReadCount));
代码示例来源:origin: apache/hive
OutputFormat<?, ?> outFormat = new OrcOutputFormat();
RecordWriter writer =
outFormat.getRecordWriter(fs, conf, testFilePath.toString(),
Reporter.NULL);
writer.write(NullWritable.get(),
inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
ColumnProjectionUtils.appendReadColumns(conf, Collections.singletonList(1));
conf.set("columns", "z,r");
conf.set("columns.types", "int:struct<x:int,y:int>");
org.apache.hadoop.mapred.RecordReader reader =
in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
int rowNum = 0;
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
IntObjectInspector intInspector =
(IntObjectInspector) fields.get(0).getFieldObjectInspector();
while (reader.next(key, value)) {
assertEquals(null, inspector.getStructFieldData(value, fields.get(0)));
Object sub = inspector.getStructFieldData(value, fields.get(1));
reader.close();
代码示例来源:origin: apache/hive
FileSystem fs = dataDir1.getFileSystem(job);
int symbolLinkedFileSize = 0;
symbolLinkedFileSize += fs.getFileStatus(dir1_file1).getLen();
Path dir1_file2 = new Path(dataDir1, "file2");
writeTextFile(dir1_file2,
"dir1_file2_line1\n" +
"dir2_file2_line2\n");
symbolLinkedFileSize += fs.getFileStatus(dir2_file2).getLen();
assertEquals(0, cs.getDirectoryCount());
FileInputFormat.setInputPaths(job, symlinkDir);
InputSplit[] splits = inputFormat.getSplits(job, 2);
inputFormat.getRecordReader(split, job, reporter);
LongWritable key = reader.createKey();
Text value = reader.createValue();
while (reader.next(key, value)) {
received.add(value.toString());
reader.close();
代码示例来源:origin: apache/asterixdb
String fileName = ((FileSplit) (inputSplits[currentSplitIndex])).getPath().toUri().getPath();
FileStatus fileStatus = hdfs.getFileStatus(new Path(fileName));
reader.close();
reader = getRecordReader(currentSplitIndex);
return true;
代码示例来源:origin: com.backtype/dfs-datastores
JobConf conf = new JobConf();
FileInputFormat.addInputPath(conf, new Path(path));
InputSplit[] splits = informat.getSplits(conf, 10000);
assertTrue(splits.length > 3); //want to test that splitting is working b/c i made really big files
for(InputSplit split: splits) {
RecordReader<Text, BytesWritable> rr = informat.getRecordReader(split, conf, Reporter.NULL);
Text t = new Text();
BytesWritable b = new BytesWritable();
while(rr.next(t, b)) {
results.put(t.toString(), new String(Utils.getBytes(b)));
rr.close();
代码示例来源:origin: apache/hive
@Test
public void testVectorizationWithAcid() throws Exception {
StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
"vectorizationAcid", inspector, true, 1);
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
Path partDir = new Path(conf.get("mapred.input.dir"));
OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
new AcidOutputFormat.Options(conf).maximumWriteId(10)
Path path = new Path("mock:/vectorizationAcid/p=0/base_0000010/bucket_00000");
setBlocks(path, conf, new MockBlock("host0", "host1"));
assertEquals(1, splits.length);
conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty());
NullWritable key = reader.createKey();
VectorizedRowBatch value = reader.createValue();
assertEquals(true, reader.next(key, value));
assertEquals(100, value.count());
LongColumnVector booleanColumn = (LongColumnVector) value.cols[0];
timestampColumn.getTime(i));
assertEquals(false, reader.next(key, value));
代码示例来源:origin: apache/chukwa
private void verifyInputFormatForSequenceFile() {
try {
JobConf conf = new JobConf();
String TMP_DIR = System.getProperty("test.build.data", "/tmp");
Path filename = new Path("file:///" + TMP_DIR + "/tmpSeqFile");
SequenceFile.Writer sfw = SequenceFile.createWriter(FileSystem
.getLocal(conf), conf, filename, ChukwaArchiveKey.class,
ChunkImpl.class, SequenceFile.CompressionType.NONE, Reporter.NULL);
long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
InputSplit split = new FileSplit(filename, 0, len, (String[]) null);
ChukwaInputFormat in = new ChukwaInputFormat();
Reporter.NULL);
LongWritable l = r.createKey();
Text line = r.createValue();
for (int i = 0; i < lines.length * 2; ++i) {
boolean succeeded = r.next(l, line);
assertTrue(succeeded);
assertEquals(i, l.get());
assertEquals(lines[i % lines.length], line.toString());
System.out.println("read line: " + l.get() + " " + line);
boolean succeeded = r.next(l, line);
assertFalse(succeeded);
代码示例来源:origin: apache/hive
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
"vectorBuckets", inspector, true, 1);
Path path = new Path(conf.get("mapred.input.dir") + "/0_0");
Writer writer =
OrcFile.createWriter(path,
conf.setInt(hive_metastoreConstants.BUCKET_COUNT, 3);
HiveInputFormat<?,?> inputFormat =
new HiveInputFormat<WritableComparable, Writable>();
NullWritable key = reader.createKey();
VectorizedRowBatch value = reader.createValue();
assertEquals(true, reader.next(key, value));
assertEquals(10, value.count());
LongColumnVector col0 = (LongColumnVector) value.cols[0];
assertEquals("checking " + i, i, col0.vector[i]);
assertEquals(false, reader.next(key, value));
代码示例来源:origin: elephantscale/hadoop-book
TeraInputFormat inFormat = new TeraInputFormat();
TextSampler sampler = new TextSampler();
Text key = new Text();
Text value = new Text();
int partitions = conf.getNumReduceTasks();
long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
int samples = Math.min(10, splits.length);
long recordsPerSample = sampleSize / samples;
RecordReader<Text, Text> reader =
inFormat.getRecordReader(splits[sampleStep * i], conf, null);
while (reader.next(key, value)) {
sampler.addKey(key);
records += 1;
FileSystem outFs = partFile.getFileSystem(conf);
if (outFs.exists(partFile)) {
outFs.delete(partFile, false);
代码示例来源:origin: apache/hive
System.out.println("Files found: ");
for (AcidUtils.ParsedDelta pd : current) {
System.out.println(pd.getPath().toString());
JobConf job = new JobConf();
job.set("mapred.input.dir", partitionPath.toString());
job.set(BUCKET_COUNT, Integer.toString(buckets));
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY));
InputSplit[] splits = inf.getSplits(job, buckets);
Assert.assertEquals(numExpectedFiles, splits.length);
org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
for (String record : records) {
Assert.assertEquals(true, rr.next(key, value));
Assert.assertEquals(record, value.toString());
Assert.assertEquals(false, rr.next(key, value));
代码示例来源:origin: prestodb/presto
throws Exception
JobConf configuration = new JobConf(new Configuration(false));
configuration.set(READ_COLUMN_IDS_CONF_STR, "0");
configuration.setBoolean(READ_ALL_COLUMNS, false);
RecordReader<K, V> recordReader = inputFormat.getRecordReader(
new FileSplit(new Path(tempFile.getFile().getAbsolutePath()), 0, tempFile.getFile().length(), (String[]) null),
configuration,
NULL);
K key = recordReader.createKey();
V value = recordReader.createValue();
while (recordReader.next(key, value)) {
Object expectedValue = iterator.next();
内容来源于网络,如有侵权,请联系作者删除!