本文整理了Java中org.apache.kafka.common.Metric.value()
方法的一些代码示例,展示了Metric.value()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Metric.value()
方法的具体详情如下:
包路径:org.apache.kafka.common.Metric
类名称:Metric
方法名:value
[英]The value of the metric as double if the metric is measurable and 0.0
otherwise.
[中]如果度量是可测量的,则度量的值为double,否则为'0.0'。
代码示例来源:origin: apache/flink
@Override
public Double getValue() {
return kafkaMetric.value();
}
}
代码示例来源:origin: apache/flink
@Override
public Double getValue() {
return kafkaMetric.value();
}
代码示例来源:origin: apache/flink
@Override
public Double getValue() {
return kafkaMetric.value();
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-base
@Override
public Double getValue() {
return kafkaMetric.value();
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.10
@Override
public Double getValue() {
return kafkaMetric.value();
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-base_2.11
@Override
public Double getValue() {
return kafkaMetric.value();
}
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11
@Override
public Double getValue() {
return kafkaMetric.value();
}
代码示例来源:origin: com.alibaba.blink/flink-connector-kafka-0.11
@Override
public Double getValue() {
return kafkaMetric.value();
}
代码示例来源:origin: org.apache.flink/flink-connector-kafka-0.11_2.11
@Override
public Double getValue() {
return kafkaMetric.value();
}
代码示例来源:origin: com.netflix.suro/suro-kafka-producer
@Override
public String getStat() {
Map<MetricName,? extends Metric> metrics = producer.metrics();
StringBuilder sb = new StringBuilder();
// add kafka producer stats, which are rates
for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
}
return sb.toString();
}
代码示例来源:origin: allegro/hermes
private void registerGauge(Producer<byte[], byte[]> producer, HermesMetrics metrics, String gauge,
Predicate<Map.Entry<MetricName, ? extends Metric>> predicate) {
metrics.registerGauge(gauge, () -> {
Optional<? extends Map.Entry<MetricName, ? extends Metric>> first =
producer.metrics().entrySet().stream().filter(predicate).findFirst();
double value = first.isPresent() ? first.get().getValue().value() : 0.0;
return value < 0? 0.0 : value;
});
}
代码示例来源:origin: com.netflix.suro/suro-kafka-producer
@Override
public String getStat() {
Map<MetricName,? extends Metric> metrics = producer.metrics();
StringBuilder sb = new StringBuilder();
// add kafka producer stats, which are rates
for( Map.Entry<MetricName,? extends Metric> e : metrics.entrySet() ){
sb.append("kafka.").append(e.getKey()).append(": ").append(e.getValue().value()).append('\n');
}
// also report our counters
sb.append("messages-in-queue4sink: ").append( this.queue4Sink.size() ).append('\n');
sb.append("queued-jobs: ").append( this.jobQueue.size() ).append('\n');
sb.append("active-threads: ").append( this.senders.getActiveCount() ).append('\n');
sb.append("received-messages: ").append( this.receivedCount.get() ).append('\n');
sb.append("sent-messages: ").append( this.sentCount.get() ).append('\n');
sb.append("sent-bytes: ").append( this.sentByteCount.get() ).append('\n');
sb.append("dropped-messages: ").append( this.droppedCount.get() ).append('\n');
sb.append("requeued-messages: ").append( this.requeuedCount.get() ).append('\n');
return sb.toString();
}
}
代码示例来源:origin: org.apache.apex/malhar-kafka-common
stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();
代码示例来源:origin: com.hotels.road/road-onramp-kafka-impl
registry.gauge(name, metric, m -> m.value());
});
代码示例来源:origin: apache/apex-malhar
stats[i].bytesPerSec = cMetrics.get(stats[i].bytePerSecMK).value();
stats[i].msgsPerSec = cMetrics.get(stats[i].msgPerSecMK).value();
代码示例来源:origin: apache/samza
private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
for (SystemStreamPartition ssp : ssps) {
TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
}
}
for (SystemStreamPartition ssp : ssps) {
MetricName mn = perPartitionMetrics.get(ssp);
Metric currentLagMetric = consumerMetrics.get(mn);
// High watermark is fixed to be the offset of last available message,
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
sink.setIsAtHighWatermark(ssp, currentLag == 0);
}
}
代码示例来源:origin: org.apache.samza/samza-kafka
private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
for (SystemStreamPartition ssp : ssps) {
TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
}
}
for (SystemStreamPartition ssp : ssps) {
MetricName mn = perPartitionMetrics.get(ssp);
Metric currentLagMetric = consumerMetrics.get(mn);
// High watermark is fixed to be the offset of last available message,
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
sink.setIsAtHighWatermark(ssp, currentLag == 0);
}
}
代码示例来源:origin: org.apache.samza/samza-kafka_2.11
private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", clientId); // this is required by the KafkaConsumer to get the metrics
for (SystemStreamPartition ssp : ssps) {
TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
}
}
for (SystemStreamPartition ssp : ssps) {
MetricName mn = perPartitionMetrics.get(ssp);
Metric currentLagMetric = consumerMetrics.get(mn);
// High watermark is fixed to be the offset of last available message,
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
long currentLag = (currentLagMetric != null) ? (long) currentLagMetric.value() : -1L;
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
sink.setIsAtHighWatermark(ssp, currentLag == 0);
}
}
代码示例来源:origin: com.netflix.suro/suro-kafka-producer
"desc",
"client-id",
props.getProperty("client.id"))).value();
double availableBytes = producer.metrics().get(
new MetricName(
"desc",
"client-id",
props.getProperty("client.id"))).value();
"desc",
"client-id",
props.getProperty("client.id"))).value();
double throughputRate = Math.max(outgoingRate, 1.0);
return (long) (consumedMemory / throughputRate * 1000);
代码示例来源:origin: airbnb/plog
@Override
public JsonObject getStats() {
Map<MetricName, ? extends Metric> metrics = producer.metrics();
JsonObject stats = new JsonObject()
.add("seen_messages", seenMessages.get())
.add("failed_to_send", failedToSendMessageExceptions.get());
// Map to Plog v4-style naming
for (Map.Entry<String, MetricName> entry: SHORTNAME_TO_METRICNAME.entrySet()) {
Metric metric = metrics.get(entry.getValue());
if (metric != null) {
stats.add(entry.getKey(), metric.value());
} else {
stats.add(entry.getKey(), 0.0);
}
}
// Use default kafka naming, include all producer metrics
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
double value = metric.getValue().value();
String name = metric.getKey().name().replace("-", "_");
if (value > -Double.MAX_VALUE && value < Double.MAX_VALUE) {
stats.add(name, value);
} else {
stats.add(name, 0.0);
}
}
return stats;
}
内容来源于网络,如有侵权,请联系作者删除!