
x33g5p2x  于2022-01-25 转载在 其他  



[英]The value of the metric as double if the metric is measurable and 0.0 otherwise.


代码示例来源:origin: apache/flink

  public Double getValue() {
    return kafkaMetric.value();

代码示例来源:origin: apache/flink

public Double getValue() {
  return kafkaMetric.value();

代码示例来源:origin: apache/flink

public Double getValue() {
  return kafkaMetric.value();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base

  public Double getValue() {
    return kafkaMetric.value();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.10

  public Double getValue() {
    return kafkaMetric.value();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.11

  public Double getValue() {
    return kafkaMetric.value();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11

public Double getValue() {
  return kafkaMetric.value();


public Double getValue() {
  return kafkaMetric.value();

代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11_2.11

public Double getValue() {
  return kafkaMetric.value();


public String getStat() {
  Map<MetricName,? extends Metric> metrics = producer.metrics();
  StringBuilder sb = new StringBuilder();
  // add kafka producer stats, which are rates
  for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
    sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
  return sb.toString();

代码示例来源:origin: allegro/hermes

private void registerGauge(Producer<byte[], byte[]> producer, HermesMetrics metrics, String gauge,
              Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
  metrics.registerGauge(gauge, () -> {
    Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
    double value = first.isPresent() ? first.get().getValue().value() : 0.0;
    return value < 0? 0.0 : value;


  public String getStat() {
    Map<MetricName,? extends Metric> metrics = producer.metrics();
    StringBuilder sb = new StringBuilder();
    // add kafka producer stats, which are rates
    for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
      sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
    // also report our counters
    sb.append("messages-in-queue4sink: ").append( this.queue4Sink.size() ).append('\n');
    sb.append("queued-jobs: ").append( this.jobQueue.size() ).append('\n');
    sb.append("active-threads: ").append( this.senders.getActiveCount() ).append('\n');
    sb.append("received-messages: ").append( this.receivedCount.get() ).append('\n');
    sb.append("sent-messages: ").append( this.sentCount.get() ).append('\n');
    sb.append("sent-bytes: ").append( this.sentByteCount.get() ).append('\n');
    sb.append("dropped-messages: ").append( this.droppedCount.get() ).append('\n');
    sb.append("requeued-messages: ").append( this.requeuedCount.get() ).append('\n');

    return sb.toString();

代码示例来源:origin: org.apache.apex/malhar-kafka-common

stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();


registry.gauge(name, metric, m -> m.value());

代码示例来源:origin: apache/apex-malhar

stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();

代码示例来源:origin: apache/samza

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);

代码示例来源:origin: org.apache.samza/samza-kafka

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);

代码示例来源:origin: org.apache.samza/samza-kafka_2.11

private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
 Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
 // populate the MetricNames first time
 if (perPartitionMetrics.isEmpty()) {
  HashMap<String, String> tags = new HashMap<>();
  tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
  for (SystemStreamPartition ssp : ssps) {
   TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
   perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
 for (SystemStreamPartition ssp : ssps) {
  MetricName mn = perPartitionMetrics.get(ssp);
  Metric currentLagMetric = consumerMetrics.get(mn);
  // High watermark is fixed to be the offset of last available message,
  // so the lag is now at least 0, which is the same as Samza's definition.
  // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
  long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
  latestLags.put(ssp, currentLag);
  // calls the setIsAtHead for the BlockingEnvelopeMap
  sink.setIsAtHighWatermark(ssp, currentLag == 0);


double availableBytes = producer.metrics().get(
  new MetricName(
  double throughputRate = Math.max(outgoingRate, 1.0);
  return (long) (consumedMemory / throughputRate * 1000);

代码示例来源:origin: airbnb/plog

public JsonObject getStats() {
  Map<MetricName, ? extends Metric> metrics = producer.metrics();
  JsonObject stats = new JsonObject()
    .add("seen_messages", seenMessages.get())
    .add("failed_to_send", failedToSendMessageExceptions.get());
  // Map to Plog v4-style naming
  for (Map.Entry<String, MetricName> entry: SHORTNAME_TO_METRICNAME.entrySet()) {
    Metric metric = metrics.get(entry.getValue());
    if (metric != null) {
      stats.add(entry.getKey(), metric.value());
    } else {
      stats.add(entry.getKey(), 0.0);
  // Use default kafka naming, include all producer metrics
  for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
    double value = metric.getValue().value();
    String name = metric.getKey().name().replace("-", "_");
    if (value > -Double.MAX_VALUE && value < Double.MAX_VALUE) {
      stats.add(name, value);
    } else {
      stats.add(name, 0.0);
  return stats;
