flink kinesis连接器未满容量使用来自kinesis数据流的消息

rwqw0loc  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(538)

我正在测试apacheflink(使用v1.8.2)从kinesis数据流读取消息的速度。kinesis数据流只包含一个shard,它包含40000条消息。每个邮件大小小于5 kb。
试图读取流从最古老的消息使用trimïu地平线,我期待应用程序应该能够读取所有消息迅速,因为每个分片可以支持高达2 mb每秒通过getrecords的最大总数据读取速率。通过连接器配置(shard\u getrecords\u max=400,shard\u getrecords\u interval\u millis=1000),应用程序应在几分钟内完成,以读取所有消息。但由于某些原因,阅读所有信息需要花费大量时间。
你介意检查一下我的连接器配置有什么问题吗?谢谢你的帮助。

public static DataStream<ObjectNode> createKinesisStream(
            StreamExecutionEnvironment env) throws IOException {
        Properties properties = new Properties();
        properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
        properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
        properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

        properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
        properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");

        return env.addSource(new FlinkKinesisConsumer<>(
                    "stream1", new JsonNodeDeserializationSchema(), properties));
    }

   main() code:
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
   env.getConfig().setAutoWatermarkInterval(10000L);

   source = AppConfig.createKinesisStream(env);

   DataStream<ObjectNode> filteredStream = source
                .map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)

24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO  DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO  DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO  DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO  DiscardingSink:15 - 856
am46iovg

am46iovg1#

一种可能的解释是,你的管道中有什么东西在对源头施加反压力。要仅测量源的容量,可以将工作简化为:

source.addSink(new DiscardingSink<>());

哪里 DiscardingSink

public static class DiscardingSink<OUT> implements SinkFunction<OUT> {

    @Override
    public void invoke(OUT value, Context context) throws Exception {
    }
}

相关问题