本文整理了Java中org.apache.hadoop.mapred.RecordReader.next
方法的一些代码示例,展示了RecordReader.next
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RecordReader.next
方法的具体详情如下:
包路径:org.apache.hadoop.mapred.RecordReader
类名称:RecordReader
方法名:next
[英]Reads the next key/value pair from the input for processing.
[中]从输入中读取下一个键/值对进行处理。
代码示例来源:origin: apache/flink
protected void fetchNext() throws IOException {
hasNext = this.recordReader.next(key, value);
fetched = true;
}
代码示例来源:origin: apache/hive
/**
* If the current batch is empty, get a new one.
* @return true if we have rows available.
*/
private boolean ensureBatch() throws IOException {
if (rowInBatch >= batch.size) {
rowInBatch = 0;
return vrbReader.next(key, batch) && batch.size > 0;
}
return true;
}
代码示例来源:origin: apache/hive
@Override
public boolean next() throws IOException {
return sourceReader.next(key, value);
}
代码示例来源:origin: apache/drill
/**
* If the current batch is empty, get a new one.
* @return true if we have rows available.
*/
private boolean ensureBatch() throws IOException {
if (rowInBatch >= batch.size) {
rowInBatch = 0;
return vrbReader.next(key, batch) && batch.size > 0;
}
return true;
}
代码示例来源:origin: apache/hive
/**
* Skip header lines in the table file when reading the record.
*
* @param currRecReader
* Record reader.
*
* @param headerCount
* Header line number of the table files.
*
* @param key
* Key of current reading record.
*
* @param value
* Value of current reading record.
*
* @return Return true if there are 0 or more records left in the file
* after skipping all headers, otherwise return false.
*/
public static boolean skipHeader(RecordReader<WritableComparable, Writable> currRecReader,
int headerCount, WritableComparable key, Writable value) throws IOException {
while (headerCount > 0) {
if (!currRecReader.next(key, value)) {
return false;
}
headerCount--;
}
return true;
}
代码示例来源:origin: apache/drill
/**
* Skip header lines in the table file when reading the record.
*
* @param currRecReader
* Record reader.
*
* @param headerCount
* Header line number of the table files.
*
* @param key
* Key of current reading record.
*
* @param value
* Value of current reading record.
*
* @return Return true if there are 0 or more records left in the file
* after skipping all headers, otherwise return false.
*/
public static boolean skipHeader(RecordReader<WritableComparable, Writable> currRecReader,
int headerCount, WritableComparable key, Writable value) throws IOException {
while (headerCount > 0) {
if (!currRecReader.next(key, value))
return false;
headerCount--;
}
return true;
}
代码示例来源:origin: apache/hive
@Override
public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException {
return rr.next(key, value.getResult());
}
代码示例来源:origin: apache/hive
/**
* do next and handle exception inside it.
* @param key
* @param value
* @return
* @throws IOException
*/
private boolean doNextWithExceptionHandler(K key, V value) throws IOException {
try {
return curReader.next(key, value);
} catch (Exception e) {
return HiveIOExceptionHandlerUtil
.handleRecordReaderNextException(e, jc);
}
}
代码示例来源:origin: apache/incubator-gobblin
/**
* {@inheritDoc}.
*
* This method will throw a {@link ClassCastException} if type {@link #<D>} is not compatible
* with type {@link #<K>} if keys are supposed to be read, or if it is not compatible with type
* {@link #<V>} if values are supposed to be read.
*/
@Override
@SuppressWarnings("unchecked")
public D readRecord(@Deprecated D reuse) throws DataRecordException, IOException {
K key = this.recordReader.createKey();
V value = this.recordReader.createValue();
if (this.recordReader.next(key, value)) {
return this.readKeys ? (D) key : (D) value;
}
return null;
}
代码示例来源:origin: apache/hive
@Override
public boolean next(NullWritable key, Row value) throws IOException {
Preconditions.checkArgument(value != null);
boolean hasNext = reader.next(key, data);
if (hasNext) {
// Deserialize data to column values, and populate the row record
Object rowObj;
try {
StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector();
rowObj = serde.deserialize(data);
setRowFromStruct(value, rowObj, rowOI);
} catch (SerDeException err) {
if (LOG.isDebugEnabled()) {
LOG.debug("Error deserializing row from data: " + data);
}
throw new IOException("Error deserializing row data", err);
}
}
return hasNext;
}
代码示例来源:origin: apache/drill
/**
* Writes value in the given value holder if next value available.
* If value is not, checks if there are any other available readers
* that may hold next value and tried to obtain value from them.
*
* @param value value holder
* @return true if value was written, false otherwise
*/
protected boolean hasNextValue(Object value) {
while (true) {
try {
if (reader.next(key, value)) {
return true;
}
if (initNextReader(job)) {
continue;
}
return false;
} catch (IOException | ExecutionSetupException e) {
throw new DrillRuntimeException(e);
}
}
}
代码示例来源:origin: prestodb/presto
@Override
public boolean advanceNextPosition()
{
try {
if (closed || !recordReader.next(key, value)) {
close();
return false;
}
// reset loaded flags
Arrays.fill(loaded, false);
// decode value
rowData = deserializer.deserialize(value);
return true;
}
catch (IOException | SerDeException | RuntimeException e) {
closeWithSuppression(this, e);
if (e instanceof TextLineLengthLimitExceededException) {
throw new PrestoException(HIVE_BAD_DATA, "Line too long in text file: " + path, e);
}
throw new PrestoException(HIVE_CURSOR_ERROR, e);
}
}
代码示例来源:origin: apache/hive
/**
* Enqueue most recent record read, and dequeue earliest result in the queue.
*
* @param job
* Current job configuration.
*
* @param recordreader
* Record reader.
*
* @param key
* Key of current reading record.
*
* @param value
* Value of current reading record.
*
* @return Return false if reaches the end of file, otherwise return true.
*/
public boolean updateBuffer(JobConf job, RecordReader recordreader,
WritableComparable key, Writable value) throws IOException {
key = ReflectionUtils.copy(job, (WritableComparable)buffer.get(cur).getFirst(), key);
value = ReflectionUtils.copy(job, (Writable)buffer.get(cur).getSecond(), value);
boolean notEOF = recordreader.next(buffer.get(cur).getFirst(), buffer.get(cur).getSecond());
if (notEOF) {
cur = (++cur) % buffer.size();
}
return notEOF;
}
代码示例来源:origin: apache/drill
/**
* Enqueue most recent record read, and dequeue earliest result in the queue.
*
* @param job
* Current job configuration.
*
* @param recordreader
* Record reader.
*
* @param key
* Key of current reading record.
*
* @param value
* Value of current reading record.
*
* @return Return false if reaches the end of file, otherwise return true.
*/
public boolean updateBuffer(JobConf job, RecordReader recordreader,
WritableComparable key, Writable value) throws IOException {
key = ReflectionUtils.copy(job, (WritableComparable)buffer.get(cur).getFirst(), key);
value = ReflectionUtils.copy(job, (Writable)buffer.get(cur).getSecond(), value);
boolean notEOF = recordreader.next(buffer.get(cur).getFirst(), buffer.get(cur).getSecond());
if (notEOF) {
cur = (++cur) % buffer.size();
}
return notEOF;
}
代码示例来源:origin: apache/hive
boolean notEOF = recordreader.next(key, value);
if (!notEOF) {
return false;
代码示例来源:origin: apache/hive
while (i < this.currentReadBlock.length && rr.next(key, val)) {
nextSplit = false;
this.currentReadBlock[i++] = (ROW) ObjectInspectorUtils.copyToStandardObject(serde
代码示例来源:origin: apache/hive
private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
Path[] paths = findFilesInBasePath();
TFSOInputFormat input = new TFSOInputFormat(rType);
FileInputFormat.setInputPaths(jc, paths);
InputSplit[] splits = input.getSplits(jc, 1);
RecordReader<NullWritable, Row> reader = input.getRecordReader(splits[0], jc,
Mockito.mock(Reporter.class));
NullWritable key = reader.createKey();
Row value = reader.createValue();
List<Row> results = new ArrayList<Row>(rows.size());
List<Row> sortedRows = new ArrayList<Row>(rows.size());
for (int i = 0; i < rows.size(); i++) {
Assert.assertTrue(reader.next(key, value));
results.add(value.clone());
sortedRows.add(rows.get(i));
}
Assert.assertFalse(reader.next(key, value));
Collections.sort(results);
Collections.sort(sortedRows);
for (int i = 0; i < rows.size(); i++) {
Assert.assertTrue(sortedRows.get(i).equals(results.get(i)));
}
}
代码示例来源:origin: apache/hive
public static List<ArrayWritable> read(Path parquetFile) throws IOException {
List<ArrayWritable> records = new ArrayList<ArrayWritable>();
RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat().
getRecordReader(new FileSplit(
parquetFile, 0, fileLength(parquetFile), (String[]) null),
new JobConf(), null);
NullWritable alwaysNull = reader.createKey();
ArrayWritable record = reader.createValue();
while (reader.next(alwaysNull, record)) {
records.add(record);
record = reader.createValue(); // a new value so the last isn't clobbered
}
return records;
}
代码示例来源:origin: prestodb/presto
while (recordReader.next(key, value)) {
Object expectedValue = iterator.next();
代码示例来源:origin: apache/hive
/**
* Scenario: Empty input directory, i.e. no symlink file.
*
* Expected: Should return empty result set without any exception.
*/
public void testAccuracy2() throws IOException {
fileSystem.mkdirs(symlinkDir);
FileInputFormat.setInputPaths(job, symlinkDir);
SymlinkTextInputFormat inputFormat = new SymlinkTextInputFormat();
ContentSummary cs = inputFormat.getContentSummary(symlinkDir, job);
assertEquals(0, cs.getLength());
assertEquals(0, cs.getFileCount());
assertEquals(0, cs.getDirectoryCount());
InputSplit[] splits = inputFormat.getSplits(job, 2);
log.info("Number of splits: " + splits.length);
// Read all values.
List<String> received = new ArrayList<String>();
for (InputSplit split : splits) {
RecordReader<LongWritable, Text> reader =
inputFormat.getRecordReader(split, job, reporter);
LongWritable key = reader.createKey();
Text value = reader.createValue();
while (reader.next(key, value)) {
received.add(value.toString());
}
reader.close();
}
List<String> expected = new ArrayList<String>();
assertEquals(expected, received);
}
内容来源于网络,如有侵权,请联系作者删除!