Flink registerEventTimeTimer未触发水印

bogh5gae  于 2022-12-16  发布在  Apache
关注(0)|答案(1)|浏览(113)

我正在使用KafkaSource读取Events类型的kafka消息,根据文档提供的事件时间提取器对于源kafka是可选的

KafkaSource<Events> source =
                KafkaSource.<Events>builder()
                        .setProperties(kafkaProperties)
                        .setBootstrapServers(parameters.get("bootstrap-servers-source"))
                        .setTopics(parameters.get("source-topic"))
                        .setGroupId("visit-events-flink-mvp")
                        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                        //.setStartingOffsets(OffsetsInitializer.earliest())
                        .setValueOnlyDeserializer(new EventsDeserializationSchema())
                        .build();

        // event stream from kafka source
        DataStream<Events> eventStream =
                env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
                        //should be a unique id
                        .uid("kafka-source");

        //stream is keyed based on the anonymousId
        DataStream<Events> keyedStream =
                eventStream.keyBy(Events::getAnonymousId)
                        //  .process(new KeyedProcessing(Long.parseLong(parameters.get("ttl"))))
                        .process(new KeyedProcessingWithCallBack(Long.parseLong(parameters.get("ttl"))))
                        .uid("engager-events-keyed-processing");

在我的KeyedProcessingWithCallBack中,我将事件时间计时器设置为60秒,但回调根本没有触发。

我的Kafka源有8个分区,我正在以并行度1运行作业

public void processElement(EngagerEvents value, KeyedProcessFunction<String, EngagerEvents, String>.Context ctx, Collector<String> out) throws Exception {
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode jsonNode = objectMapper.readTree(value.getEventString());
        System.out.println("time : " +jsonNode.get("EVENT_TIMESTAMP").textValue());
        if (anonymousIdHasBeenSeen.value() == null) {
            System.out.println("time stamp emitting: " +jsonNode.get("EVENT_TIMESTAMP").textValue());
            // key is not available in the state
            anonymousIdHasBeenSeen.update(true);
            System.out.println("TIMER START TIME: " +ctx.timestamp());
            out.collect(value.getEventString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + (stateTtl * 1000));
        }
    }

    

    // not getting triggered
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
            throws Exception {
        // triggers after ttl has passed
        System.out.println("Call back triggered : time : " +timestamp + " value : " +anonymousIdHasBeenSeen.value());
       anonymousIdHasBeenSeen.clear();
    }

测试模拟器代码,该代码将发送anonymousId=111且事件时间戳不同的事件

try {
            for (int i = 0; i < 500; i++) {
                String[] anonymousId = {"111"};
                String key = String.valueOf(new Random().nextInt(10));
                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "flink-visits-mvp-test-source",
                        key,
                        //   getEvent(UUID.randomUUID().toString() + "-"  +Thread.currentThread().getName() , event[new Random().nextInt(1)]));
                        // getEvent(anonymousId[new Random().nextInt(1)], event[new Random().nextInt(1)]));
                        getEvent(anonymousId[new Random().nextInt(1)],
                                System.currentTimeMillis(),
                                event));

                //System.out.println(record.value().toString());
                    producer.send(record);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

我是否做错了什么?为什么我的事件时间计时器回调不触发?

5w9g7ksd

5w9g7ksd1#

我在使用Flink V1.16时也遇到了同样的错误。processElement方法按预期调用,但观察到**ctx.timerService().currentWatermark()**始终打印为-9223372036854775808. onTimer方法从未使用60秒计时器调用。
经过大量的试验和错误,我发现调用env.setParallelism()方法可以解决这个问题。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

修复后,currentWatermark()方法返回正确的水印时间,并且每60秒调用一次onTimer方法。
以下是修复前的日志条目:
事件时间= 1669610765000,窗口结束时间= 1669610819999,当前水位线时间= -9223372036854775808
事件时间= 1669610807000,窗口结束时间= 1669610819999,当前水位线时间= -9223372036854775808
以下是修复后的日志条目:
事件时间= 1669610393000,窗口结束时间= 1669610399999,当前水位线时间= 1669610387999
事件时间= 1669610450000,窗口结束时间= 1669610459999,当前水位线时间= 1669610435999

相关问题