我正在使用KafkaSource读取Events类型的kafka消息,根据文档提供的事件时间提取器对于源kafka是可选的
KafkaSource<Events> source =
KafkaSource.<Events>builder()
.setProperties(kafkaProperties)
.setBootstrapServers(parameters.get("bootstrap-servers-source"))
.setTopics(parameters.get("source-topic"))
.setGroupId("visit-events-flink-mvp")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
//.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new EventsDeserializationSchema())
.build();
// event stream from kafka source
DataStream<Events> eventStream =
env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
//should be a unique id
.uid("kafka-source");
//stream is keyed based on the anonymousId
DataStream<Events> keyedStream =
eventStream.keyBy(Events::getAnonymousId)
// .process(new KeyedProcessing(Long.parseLong(parameters.get("ttl"))))
.process(new KeyedProcessingWithCallBack(Long.parseLong(parameters.get("ttl"))))
.uid("engager-events-keyed-processing");
在我的KeyedProcessingWithCallBack中,我将事件时间计时器设置为60秒,但回调根本没有触发。
我的Kafka源有8个分区,我正在以并行度1运行作业
public void processElement(EngagerEvents value, KeyedProcessFunction<String, EngagerEvents, String>.Context ctx, Collector<String> out) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(value.getEventString());
System.out.println("time : " +jsonNode.get("EVENT_TIMESTAMP").textValue());
if (anonymousIdHasBeenSeen.value() == null) {
System.out.println("time stamp emitting: " +jsonNode.get("EVENT_TIMESTAMP").textValue());
// key is not available in the state
anonymousIdHasBeenSeen.update(true);
System.out.println("TIMER START TIME: " +ctx.timestamp());
out.collect(value.getEventString());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + (stateTtl * 1000));
}
}
// not getting triggered
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out)
throws Exception {
// triggers after ttl has passed
System.out.println("Call back triggered : time : " +timestamp + " value : " +anonymousIdHasBeenSeen.value());
anonymousIdHasBeenSeen.clear();
}
测试模拟器代码,该代码将发送anonymousId=111且事件时间戳不同的事件
try {
for (int i = 0; i < 500; i++) {
String[] anonymousId = {"111"};
String key = String.valueOf(new Random().nextInt(10));
ProducerRecord<String, String> record = new ProducerRecord<>(
"flink-visits-mvp-test-source",
key,
// getEvent(UUID.randomUUID().toString() + "-" +Thread.currentThread().getName() , event[new Random().nextInt(1)]));
// getEvent(anonymousId[new Random().nextInt(1)], event[new Random().nextInt(1)]));
getEvent(anonymousId[new Random().nextInt(1)],
System.currentTimeMillis(),
event));
//System.out.println(record.value().toString());
producer.send(record);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
我是否做错了什么?为什么我的事件时间计时器回调不触发?
1条答案
按热度按时间5w9g7ksd1#
我在使用Flink V1.16时也遇到了同样的错误。processElement方法按预期调用,但观察到**ctx.timerService().currentWatermark()**始终打印为-9223372036854775808. onTimer方法从未使用60秒计时器调用。
经过大量的试验和错误,我发现调用env.setParallelism()方法可以解决这个问题。
修复后,currentWatermark()方法返回正确的水印时间,并且每60秒调用一次onTimer方法。
以下是修复前的日志条目:
事件时间= 1669610765000,窗口结束时间= 1669610819999,当前水位线时间= -9223372036854775808
事件时间= 1669610807000,窗口结束时间= 1669610819999,当前水位线时间= -9223372036854775808
以下是修复后的日志条目:
事件时间= 1669610393000,窗口结束时间= 1669610399999,当前水位线时间= 1669610387999
事件时间= 1669610450000,窗口结束时间= 1669610459999,当前水位线时间= 1669610435999