org.apache.hadoop.mapreduce.Counter类的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(16.8k)|赞(0)|评价(0)|浏览(271)

本文整理了Java中org.apache.hadoop.mapreduce.Counter类的一些代码示例,展示了Counter类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Counter类的具体详情如下:
包路径:org.apache.hadoop.mapreduce.Counter
类名称:Counter

Counter介绍

[英]A named counter that tracks the progress of a map/reduce job.

Counters represent global counters, defined either by the Map-Reduce framework or applications. Each Counter is named by an Enum and has a long for the value.

Counters are bunched into Groups, each comprising of counters from a particular Enum class.
[中]跟踪映射/减少作业进度的命名计数器。
Counters表示由Map Reduce框架或应用程序定义的全局计数器。每个Counter由一个枚举命名,并有一个long作为值。
Counters被分成若干组,每个组由特定Enum类的计数器组成。

代码示例

代码示例来源:origin: apache/hbase

@Override
public void setup(Context context) throws IOException {
 Configuration conf = context.getConfiguration();
 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true);
 filesGroup = conf.get(CONF_FILES_GROUP);
 filesUser = conf.get(CONF_FILES_USER);
 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0);
 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT));
 inputRoot = new Path(conf.get(CONF_INPUT_ROOT));
 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY);
  context.getCounter(c).increment(0);
 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) {
  testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0);

代码示例来源:origin: apache/incubator-druid

job = Job.getInstance(
  new Configuration(),
  StringUtils.format("%s-index-generator-%s", config.getDataSource(), config.getIntervals())
);
job.getConfiguration().set("io.sort.record.percent", "0.23");
JobHelper.injectDruidProperties(job.getConfiguration(), config.getAllowedHadoopPrefix());
job.setMapperClass(IndexGeneratorMapper.class);
 log.info("No counters found for job [%s]", job.getJobName());
} else {
 Counter invalidRowCount = counters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER);
 if (invalidRowCount != null) {
  jobStats.setInvalidRowCount(invalidRowCount.getValue());
 } else {
  log.info("No invalid row counter found for job [%s]", job.getJobName());

代码示例来源:origin: apache/ignite

/** {@inheritDoc} */
@Override public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
  for (final Counter counter : rightGroup)
    cntrs.findCounter(name, counter.getName()).increment(counter.getValue());
}

代码示例来源:origin: apache/ignite

/** {@inheritDoc} */
@Override public void addCounter(Counter counter) {
  addCounter(counter.getName(), counter.getDisplayName(), 0);
}

代码示例来源:origin: apache/incubator-gobblin

/**
 * Create a {@link org.apache.gobblin.metrics.GobblinMetrics} instance for this job run from the Hadoop counters.
 */
@VisibleForTesting
void countersToMetrics(GobblinMetrics metrics) throws IOException {
 Optional<Counters> counters = Optional.fromNullable(this.job.getCounters());
 if (counters.isPresent()) {
  // Write job-level counters
  CounterGroup jobCounterGroup = counters.get().getGroup(MetricGroup.JOB.name());
  for (Counter jobCounter : jobCounterGroup) {
   metrics.getCounter(jobCounter.getName()).inc(jobCounter.getValue());
  }
  // Write task-level counters
  CounterGroup taskCounterGroup = counters.get().getGroup(MetricGroup.TASK.name());
  for (Counter taskCounter : taskCounterGroup) {
   metrics.getCounter(taskCounter.getName()).inc(taskCounter.getValue());
  }
 }
}

代码示例来源:origin: apache/hive

MapCreate.writeCount = 0;
Configuration conf = new Configuration();
Job job = new Job(conf, "hcat mapreduce write test");
job.setJarByClass(this.getClass());
job.setMapperClass(HCatMapReduceTest.MapCreate.class);
 Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
 createInputFile(path, writeCount);
 TextInputFormat.setInputPaths(job, path);
 Path path = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput");
 createInputFile(path, writeCount / 2);
 Path path2 = new Path(fs.getWorkingDirectory(), "mapred/testHCatMapReduceInput2");
 createInputFile(path2, (writeCount - writeCount / 2));
 job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern);
 assertTrue(job.getCounters().getGroup("FileSystemCounters")
   .findCounter("FILE_BYTES_READ").getValue() > 0);

代码示例来源:origin: apache/incubator-druid

jobConf.setWorkingDirectory(new Path(converterConfig.getDistributedSuccessCache()));
final Job job = Job.getInstance(jobConf);
job.setInputFormatClass(ConfigInputFormat.class);
job.setMapperClass(ConvertingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
  loadedBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_LOADED).getValue();
  writtenBytes = job.getCounters().findCounter(COUNTER_GROUP, COUNTER_WRITTEN).getValue();
 final Path jobDir = getJobPath(jobID, job.getWorkingDirectory());
 final FileSystem fs = jobDir.getFileSystem(job.getConfiguration());
 final RemoteIterator<LocatedFileStatus> it = fs.listFiles(jobDir, true);
 final List<Path> goodPaths = new ArrayList<>();
  if (locatedFileStatus.isFile()) {
   final Path myPath = locatedFileStatus.getPath();
   if (ConvertingOutputFormat.DATA_SUCCESS_KEY.equals(myPath.getName())) {
    goodPaths.add(new Path(myPath.getParent(), ConvertingOutputFormat.DATA_FILE_KEY));

代码示例来源:origin: apache/incubator-druid

@Override
protected void map(String key, String value, final Context context) throws IOException, InterruptedException
 final InputSplit split = context.getInputSplit();
 if (!(split instanceof DatasourceInputSplit)) {
  throw new IAE(
 final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY);
 final File tmpDir = Paths.get(tmpDirLoc).toFile();
 final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration());
 context.setStatus("DOWNLOADING");
 context.progress();
 final Path inPath = new Path(JobHelper.getURIFromSegment(segment));
 final File inDir = new File(tmpDir, "in");
 final long inSize = JobHelper.unzipNoGuava(inPath, context.getConfiguration(), inDir, context, null);
 log.debug("Loaded %d bytes into [%s] for converting", inSize, inDir.getAbsolutePath());
 context.getCounter(COUNTER_GROUP, COUNTER_LOADED).increment(inSize);
 context.setStatus("CONVERTING");
 context.getCounter(COUNTER_GROUP, COUNTER_WRITTEN).increment(finalSegment.getSize());
 context.progress();
 context.setStatus("Ready To Commit");

代码示例来源:origin: linkedin/camus

DOMConfigurator.configure("log4j.xml");
FileSystem fs = FileSystem.get(job.getConfiguration());
Path execBasePath = new Path(props.getProperty(ETL_EXECUTION_BASE_PATH));
Path execHistory = new Path(props.getProperty(ETL_EXECUTION_HISTORY_PATH));
  (long) (content.getQuota() * job.getConfiguration().getFloat(ETL_EXECUTION_HISTORY_MAX_OF_QUOTA, (float) .5));
limit = limit == 0 ? 50000 : limit;
 log.info("removing old execution: " + stat.getPath().getName());
 ContentSummary execContent = fs.getContentSummary(stat.getPath());
 currentCount -= execContent.getFileCount() + execContent.getDirectoryCount();
job.setMapperClass(EtlMapper.class);
job.setInputFormatClass(inputFormatClass);
Counters counters = job.getCounters();
for (String groupName : counters.getGroupNames()) {
 CounterGroup group = counters.getGroup(groupName);
 log.info("Group: " + group.getDisplayName());
 for (Counter counter : group) {
  log.info(counter.getDisplayName() + ":\t" + counter.getValue());

代码示例来源:origin: apache/hbase

String[] fams = families.split(",");
 for(String fam : fams) {
  scan.addFamily(Bytes.toBytes(fam));
sourceTable = sourceConnection.getTable(tableName);
final InputSplit tableSplit = context.getInputSplit();
String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
   new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
} else {
 replicatedScanner = replicatedTable.getScanner(scan);
 break;
int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
if (rowCmpRet == 0) {
  context.getCounter(Counters.GOODROWS).increment(1);
  if (verbose) {
   LOG.info("Good row key: " + delimiter
     + Bytes.toStringBinary(value.getRow()) + delimiter);

代码示例来源:origin: apache/kylin

public void updateJobCounter() {
  try {
    Counters counters = job.getCounters();
    if (counters == null) {
      String errorMsg = "no counters for job " + getMrJobId();
      output.append(errorMsg);
    } else {
      this.output.append(counters.toString()).append("\n");
      logger.debug(counters.toString());
      mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
      rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
      String outputFolder = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir",
          KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
      logger.debug("outputFolder is " + outputFolder);
      Path outputPath = new Path(outputFolder);
      String fsScheme = outputPath.getFileSystem(job.getConfiguration()).getScheme();
      long bytesWritten = counters.findCounter(fsScheme, FileSystemCounter.BYTES_WRITTEN).getValue();
      if (bytesWritten == 0) {
        logger.debug("Seems no counter found for " + fsScheme);
        bytesWritten = counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue();

代码示例来源:origin: apache/incubator-gobblin

CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset);
Path tmpPath = configurator.getMrOutputPath();
Path dstPath = new Path (result.getDstAbsoluteDir());
long oldTotalRecords = helper.readRecordCount(new Path (result.getDstAbsoluteDir()));
long executeCount = helper.readExecutionCount (new Path (result.getDstAbsoluteDir()));
 Counter counter = job.getCounters().findCounter(AvroKeyMapper.EVENT_COUNTER.RECORD_COUNT);
 newTotalRecords = counter.getValue();
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1));
compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
   CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, Long.toString(oldTotalRecords),
   CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executeCount + 1),
   CompactionSlaEventHelper.MR_JOB_ID, this.configurator.getConfiguredJob().getJobID().toString());
 this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT, eventMetadataMap);

代码示例来源:origin: apache/hbase

Job job = new Job(conf);
job.setJobName(jobName);
job.setJarByClass(getClass());
  job,
  true,
  new Path(restoreDir)
);
Counters counters = job.getCounters();
long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);

代码示例来源:origin: apache/ignite

final Job job = Job.getInstance(conf);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setMapperClass(TestCountingMapper.class);
  job.setCombinerClass(TestCountingCombiner.class);
  FileInputFormat.setInputPaths(job, new Path("igfs://" + igfsName + "@" + PATH_INPUT));
  FileOutputFormat.setOutputPath(job, new Path("igfs://" + igfsName + "@" + PATH_OUTPUT));
  job.submit();
  final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
  assertEquals(0, cntr.getValue());
  cntr.increment(10);
  assertEquals(10, cntr.getValue());
  assertEquals("wrong counters count", 3, counters.countCounters());
  assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
  assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
  assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());

代码示例来源:origin: aseldawy/spatialhadoop2

public static String[] takeSample(Path[] files, OperationsParams params) throws IOException, ClassNotFoundException, InterruptedException {
 FileSystem fs = files[0].getFileSystem(params);
 Path tempPath;
 do {
  tempPath = new Path(String.format("temp_sample_%06d", (int)(Math.random()*1000000)));
 } while (fs.exists(tempPath));
 Job job = sampleMapReduce(files, tempPath, params);
 job.waitForCompletion(false);
 int outputSize = (int) job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
 // Read the file back
 String[] lines =  Head.head(fs, tempPath, outputSize);
 // Delete the temporary path with all its contents
 fs.delete(tempPath, true);
 return lines;
}

代码示例来源:origin: apache/bigtop

private void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
 Path outputDir =
   new Path(HBaseTestUtil.getMROutputDir(TEST_NAME),
     "verify-output");
 Job job = new Job(conf);
 job.setJarByClass(this.getClass());
 job.setJobName(TEST_NAME + " Verification for " + htd.getNameAsString());
 Scan scan = new Scan();
 TableMapReduceUtil.initTableMapperJob(
   htd.getNameAsString(), scan, VerifyMapper.class,
   BytesWritable.class, BytesWritable.class, job);
 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
 TableMapReduceUtil.setScannerCaching(job, SCANNER_CACHING);
 job.setReducerClass(VerifyReducer.class);
 job.setNumReduceTasks(NUM_REDUCE_TASKS);
 FileOutputFormat.setOutputPath(job, outputDir);
 assertTrue(job.waitForCompletion(true));
 long numOutputRecords = job.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS).getValue();
 assertEquals(0, numOutputRecords);
}

代码示例来源:origin: larsgeorge/hbase-book

public void map(ImmutableBytesWritable row, Result columns, Context context)
throws IOException {
 context.getCounter(Counters.ROWS).increment(1);
 String value = null;
 try {
  for (Cell cell : columns.listCells()) {
   context.getCounter(Counters.COLS).increment(1);
   value = Bytes.toStringBinary(cell.getValueArray(),
    cell.getValueOffset(), cell.getValueLength());
   JSONObject json = (JSONObject) parser.parse(value);
   String author = (String) json.get("author");
   if (context.getConfiguration().get("conf.debug") != null)
    System.out.println("Author: " + author);
   context.write(new Text(author), ONE);
   context.getCounter(Counters.VALID).increment(1);
  System.err.println("Row: " + Bytes.toStringBinary(row.get()) +
   ", JSON: " + value);
  context.getCounter(Counters.ERROR).increment(1);

代码示例来源:origin: apache/hbase

protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception {
 Path outputDir = getTestDir(TEST_NAME, "verify-output");
 LOG.info("Verify output dir: " + outputDir);
 Job job = Job.getInstance(conf);
 job.setJarByClass(this.getClass());
 job.setJobName(TEST_NAME + " Verification for " + htd.getTableName());
 setJobScannerConf(job);
 Scan scan = new Scan();
 TableMapReduceUtil.initTableMapperJob(
   htd.getTableName().getNameAsString(), scan, VerifyMapper.class,
   BytesWritable.class, BytesWritable.class, job);
 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), AbstractHBaseTool.class);
 int scannerCaching = conf.getInt("verify.scannercaching", SCANNER_CACHING);
 TableMapReduceUtil.setScannerCaching(job, scannerCaching);
 job.setReducerClass(VerifyReducer.class);
 job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS_KEY, NUM_REDUCE_TASKS_DEFAULT));
 FileOutputFormat.setOutputPath(job, outputDir);
 assertTrue(job.waitForCompletion(true));
 long numOutputRecords = job.getCounters().findCounter(Counters.ROWS_WRITTEN).getValue();
 assertEquals(0, numOutputRecords);
}

代码示例来源:origin: apache/hbase

private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
  throws IOException, InterruptedException, ClassNotFoundException {
 Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
 if (job == null) {
  fail("Job wasn't created, see the log");
 }
 if (!job.waitForCompletion(true)) {
  fail("Job failed, see the log");
 }
 assertEquals(expectedGoodRows,
  job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
 assertEquals(expectedBadRows,
  job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
}

代码示例来源:origin: apache/hbase

FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
String temPath1 = utility1.getRandomDir().toString();
String temPath2 = "/tmp2";
 fail("Job wasn't created, see the log");
if (!job.waitForCompletion(true)) {
 fail("Job failed, see the log");
 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(0,
 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
 Cell firstVal = result.rawCells()[0];
 put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
  Bytes.toBytes("diff data"));
 htable2.put(put);
 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(NB_ROWS_IN_BATCH,
 job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());

相关文章