apache flink上的吞吐量和延迟

41ik7eoe  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(594)

我已经为apache flink编写了一个非常简单的java程序,现在我对测量吞吐量(每秒处理的元组数)和延迟(程序需要处理每个输入元组的时间)等统计数据很感兴趣。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
        .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

我知道flink暴露了一些指标:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html
但是我不知道如何使用它们来获得我想要的。从这个链接中我读到一个“meter”可以用来测量平均吞吐量,但是在定义了它之后,我应该如何使用它呢?

vhipe2zx

vhipe2zx1#

我们正在运行的定制指标,如米,在我们的生产流水作业在Yarn运行计量。
以下是步骤:
pom.xml的附加依赖关系

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>${flink.version}</version>
</dependency>

我们使用的是1.2.1版本
然后将meter添加到mymapper类。

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Test {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
                .map(new MyMapper())
                .writeAsCsv("/home/LizardKing/Results.csv");

        JobExecutionResult res = env.execute();
    }

    private static class MyMapper extends RichMapFunction<String, Object> {

        private transient Meter meter;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.meter = getRuntimeContext()
                    .getMetricGroup()
                    .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        }

        @Override
        public Object map(String value) throws Exception {    
            this.meter.markEvent();
            return value;
        }
    }
}

希望这有帮助。

相关问题