org.apache.kafka.common.Metric.value()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.5k)|赞(0)|评价(0)|浏览(188)

本文整理了Java中org.apache.kafka.common.Metric.value()方法的一些代码示例,展示了Metric.value()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Metric.value()方法的具体详情如下:
包路径:org.apache.kafka.common.Metric
类名称:Metric
方法名:value

Metric.value介绍

[英]The value of the metric as double if the metric is measurable and 0.0 otherwise.
[中]如果度量是可测量的,则度量的值为double,否则为'0.0'。

代码示例

代码示例来源:origin: apache/flink

@Override
  public Double getValue() {
    return kafkaMetric.value();
  }
}

代码示例来源:origin: apache/flink

@Override
public Double getValue() {
  return kafkaMetric.value();
}

代码示例来源:origin: apache/flink

@Override
public Double getValue() {
  return kafkaMetric.value();
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base

@Override
  public Double getValue() {
    return kafkaMetric.value();
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.10

@Override
  public Double getValue() {
    return kafkaMetric.value();
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.11

@Override
  public Double getValue() {
    return kafkaMetric.value();
  }
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11

@Override
public Double getValue() {
  return kafkaMetric.value();
}

代码示例来源:origin: com.alibaba.blink/flink-connector-kafka-0.11

@Override
public Double getValue() {
  return kafkaMetric.value();
}

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11_2.11

@Override
public Double getValue() {
  return kafkaMetric.value();
}

代码示例来源:origin: com.netflix.suro/suro-kafka-producer

@Override
public String getStat() {
  Map<MetricName,? extends Metric> metrics = producer.metrics();
  StringBuilder sb = new StringBuilder();
  // add kafka producer stats, which are rates
  for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
    sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
  }
  return sb.toString();
}

代码示例来源:origin: allegro/hermes

private void registerGauge(Producer<byte[], byte[]> producer, HermesMetrics metrics, String gauge,
              Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
  metrics.registerGauge(gauge, () -> {
    Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
        producer.metrics().entrySet().stream().filter(predicate).findFirst();
    double value = first.isPresent() ? first.get().getValue().value() : 0.0;
    return value < 0? 0.0 : value;
  });
}

代码示例来源:origin: com.netflix.suro/suro-kafka-producer

@Override
  public String getStat() {
    Map<MetricName,? extends Metric> metrics = producer.metrics();
    StringBuilder sb = new StringBuilder();
    // add kafka producer stats, which are rates
    for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
      sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
    }
    // also report our counters
    sb.append("messages-in-queue4sink: ").append( this.queue4Sink.size() ).append('\n');
    sb.append("queued-jobs: ").append( this.jobQueue.size() ).append('\n');
    sb.append("active-threads: ").append( this.senders.getActiveCount() ).append('\n');
    sb.append("received-messages: ").append( this.receivedCount.get() ).append('\n');
    sb.append("sent-messages: ").append( this.sentCount.get() ).append('\n');
    sb.append("sent-bytes: ").append( this.sentByteCount.get() ).append('\n');
    sb.append("dropped-messages: ").append( this.droppedCount.get() ).append('\n');
    sb.append("requeued-messages: ").append( this.requeuedCount.get() ).append('\n');

    return sb.toString();
  }
}

代码示例来源:origin: org.apache.apex/malhar-kafka-common

stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();

代码示例来源:origin: com.hotels.road/road-onramp-kafka-impl

registry.gauge(name, metric, m -> m.value());
});

代码示例来源:origin: apache/apex-malhar

stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();

代码示例来源:origin: apache/samza

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
  }
 }
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);
 }
}

代码示例来源:origin: org.apache.samza/samza-kafka

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
  }
 }
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);
 }
}

代码示例来源:origin: org.apache.samza/samza-kafka_2.11

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
  }
 }
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);
 }
}

代码示例来源:origin: com.netflix.suro/suro-kafka-producer

"desc",
    "client-id",
    props.getProperty("client.id"))).value();
double availableBytes = producer.metrics().get(
  new MetricName(
    "desc",
    "client-id",
    props.getProperty("client.id"))).value();
      "desc",
      "client-id",
      props.getProperty("client.id"))).value();
  double throughputRate = Math.max(outgoingRate, 1.0);
  return (long) (consumedMemory / throughputRate * 1000);

代码示例来源:origin: airbnb/plog

@Override
public JsonObject getStats() {
  Map<MetricName, ? extends Metric> metrics = producer.metrics();
  JsonObject stats = new JsonObject()
    .add("seen_messages", seenMessages.get())
    .add("failed_to_send", failedToSendMessageExceptions.get());
  // Map to Plog v4-style naming
  for (Map.Entry<String, MetricName> entry: SHORTNAME_TO_METRICNAME.entrySet()) {
    Metric metric = metrics.get(entry.getValue());
    if (metric != null) {
      stats.add(entry.getKey(), metric.value());
    } else {
      stats.add(entry.getKey(), 0.0);
    }
  }
  // Use default kafka naming, include all producer metrics
  for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
    double value = metric.getValue().value();
    String name = metric.getKey().name().replace("-", "_");
    if (value > -Double.MAX_VALUE && value < Double.MAX_VALUE) {
      stats.add(name, value);
    } else {
      stats.add(name, 0.0);
    }
  }
  return stats;
}

相关文章