flink检查点的大小超过20gb,检查点时间超过1分钟

mzaanser  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(643)

首先是:
我是flink的新手(了解原理,能够创建任何我需要的基本流媒体工作)
我使用kinesis分析来运行我的flink作业,默认情况下,它使用1分钟间隔的增量检查点。
flink作业使用flinkkinesisconsumer和自定义反序列化器从kinesis流读取事件(将字节反序列化为整个作业中使用的简单java对象)
我想归档的只是简单地计算过去24小时内实体id/foo和实体id/bar的事件数。重要的是,这个计数是尽可能准确,这就是为什么我使用这个flink功能,而不是做一个5分钟滚动窗口自己运行总和。我还希望能够有一个'总'事件计数从一开始(而不仅仅是过去24小时),所以我也在结果中输出过去5分钟的事件计数,以便后处理应用程序可以简单地采取这5分钟的数据,并做一个运行总和(这个计数不一定要准确,如果出现中断,我会丢失一些计数,这也没关系)
现在,这项工作一直做得很好,直到上周我们的交通流量激增了10倍。从那一刻起,Flink就变成了香蕉。检查点大小开始从~500mb缓慢增长到20gb,检查点时间约为1分钟,并随着时间的推移而增长。应用程序开始失败,并且永远无法完全恢复,事件迭代器的运行时间也永远不会停止,因此没有新的事件被使用。
因为我是flink的新手,所以我不确定我做滑动计数的方式是完全没有优化,还是完全错误。
这是代码关键部分的一个小片段:
源(myjsondeserializationschema扩展了abstractdeserializationschema,只需读取byte并创建事件对象):

SourceFunction<Event> source =
      new FlinkKinesisConsumer<>("input-kinesis-stream", new MyJsonDeserializationSchema(), kinesisConsumerConfig);

将在flink操作符中使用的输入事件simple java pojo:

public class Event implements Serializable {
  public String entityId;
  public String entityType;
  public String entityName;
  public long eventTimestamp = System.currentTimeMillis();
}

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<Event> eventsStream = kinesis
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(30)) {
        @Override
        public long extractTimestamp(Event event) {
          return event.eventTimestamp;
        }
      })

DataStream<Event> fooStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "foo".equalsIgnoreCase(event.entityType);
        }
      })

 DataStream<Event> barStream = eventsStream
      .filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event event) throws Exception {
          return "bar".equalsIgnoreCase(event.entityType);
        }
      })

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    Table fooTable = tEnv.fromDataStream("fooStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Foo", fooTable);
    Table barTable = tEnv.fromDataStream("barStream, entityId, entityName, entityType, eventTimestame.rowtime");
    tEnv.registerTable("Bar", barTable);

Table slidingFooCountTable = fooTable
      .window(Slide.over("24.hour").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingFooId, entityid as slidingFooEntityid, entityName as slidingFooEntityName, entityType.count as slidingFooCount, minuteWindow.rowtime as slidingFooMinute");

Table slidingBarCountTable = barTable
      .window(Slide.over("24.hout").every("5.minute").on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityId, entityName, minuteWindow")
      .select("concat(concat(entityId,'_'), entityName) as slidingBarId, entityid as slidingBarEntityid, entityName as slidingBarEntityName, entityType.count as slidingBarCount, minuteWindow.rowtime as slidingBarMinute");

    Table tumblingFooCountTable = fooTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingFooId, entityId as tumblingFooEntityId, entityNamae as tumblingFooEntityName, entityType.count as tumblingFooCount, minuteWindow.rowtime as tumblingFooMinute");

    Table tumblingBarCountTable = barTable
      .window(Tumble.over(tumblingWindowTime).on("eventTimestamp").as("minuteWindow"))
      .groupBy("entityid, entityName, minuteWindow")
      .select("concat(concat(entityName,'_'), entityName) as tumblingBarId, entityId as tumblingBarEntityId, entityNamae as tumblingBarEntityName, entityType.count as tumblingBarCount, minuteWindow.rowtime as tumblingBarMinute");

    Table aggregatedTable = slidingFooCountTable
      .leftOuterJoin(slidingBarCountTable, "slidingFooId = slidingBarId && slidingFooMinute = slidingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingBarId && slidingFooMinute = tumblingBarMinute")
      .leftOuterJoin(tumblingFooCountTable, "slidingFooId = tumblingFooId && slidingFooMinute = tumblingFooMinute")
      .select("slidingFooMinute as timestamp, slidingFooCreativeId as entityId, slidingFooEntityName as entityName, slidingFooCount, slidingBarCount, tumblingFooCount, tumblingBarCount");

    DataStream<Result> result = tEnv.toAppendStream(aggregatedTable, Result.class);
    result.addSink(sink); // write to an output stream to be picked up by a lambda function

我将非常感谢有更多的经验,在与Flink工作的人可以评论的方式,我做了我的计数?我的代码是否完全设计过度了?有没有更好更有效的方法来计算24小时内发生的事件?
我在stackoverflow@davidanderson的某个地方读到过,建议使用map state创建我们自己的滑动窗口,并按时间戳对事件进行切片。然而,我不完全确定这意味着什么,我没有找到任何代码示例来显示它。

mitkmikd

mitkmikd1#

你在那里创建了不少窗口。如果您正在创建一个大小为24小时、滑动时间为5分钟的滑动窗口,这意味着其中将有许多打开的窗口,因此如果您仔细考虑的话,您可能希望您在给定日期收到的所有数据都将在至少一个窗口中进行检查。因此,可以肯定的是,检查点的大小和时间会随着数据本身的增长而增长。
为了能够得到答案,如果代码可以重写,你需要提供更多的细节,你到底是要实现这里。

相关问题