我正在使用testharness测试我的自定义触发器。简化的代码段附在下面:
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {
private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);
private final long allowedLatenessMillis;
public CustomTrigger(long allowedLatenessMillis) {
this.allowedLatenessMillis = allowedLatenessMillis;
}
@Override
public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();
// Remove previous Timer trigger. else it will invoke twice.
if (previousTriggerTime != null) {
ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
}
// register new trigger for current InputPOJO.
long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
ctx.registerProcessingTimeTimer(currentTriggerTime);
// Update currentTriggerTime in previousTriggerState.
previousTriggerTimeState.add(currentTriggerTime);
return TriggerResult.CONTINUE;
}
...
}
在自定义触发器中,我为每个新的inputpojo注册一个新的计时器。当我注册计时器时,我正在删除前一个计时器(基于先前的TimerTriggerTime,保存在简化状态)。
我正在使用下面的代码片段测试计时器计数(以及窗口)。
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;
TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?
ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
* <pre>
*
* DataStream<OutputPOJO> OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
* </pre>
*/
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
// setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
// setting .keyBy(InputPOJO::getKey)
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
// setting .process(new CustomWindowFunction(windowListStateTtlMillis))
new InternalIterableProcessWindowFunction<>(CustomWindowFunction),
// setting .trigger(new CustomTrigger(allowedLateness))
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
// Creating testHarness for window operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
// Setup and Open Test Harness
testHarness.setup();
testHarness.open();
}
@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
//1. Arrange
InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
InputPOJO listStateInput2 = new InputPOJO(2,"Arun");
// 2. Act
// listStateInput1 comes at 1 sec
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
testHarness.setProcessingTime(secondsToMillis(2));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer.
// Actual: listStateInput2 registered a new timer and the total count is 3.
// NOTE:
// 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
// 2. Second timer should be the InputPOJO registered timer.
Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS
}
在这里,功能, ctx.deleteProcessingTimeTimer(previousTriggerTime);
,正在触发。但是testharness中的timercount仍然显示为3。
是testharness中的bug吗?
请提供一种使用testharness测试计时器计数的方法。
附言:
尽管这看起来像sessionwindow.gap()的典型功能,但我在一个复杂的计算中使用了这个自定义触发器。为简单起见,我将逻辑简化为上述内容。
我正在使用 ListStateDescriptor
创建时 WindowOperator
用于测试线束。
暂无答案!
目前还没有任何答案,快来回答吧!