我的任务是为kafka kstreams处理器中接收的消息实现重试机制。拓扑结构是这样的(名称/方法被更改以保护无辜者):
KStream<String, GenericRecord> inputStream = kStreamBuilder.stream(inputTopic,Consumed.with(Serdes.String(), getSerDe()));
return inputStream
.peek((key,value) -> metricsService.startTimer())
.map((key, value) -> KeyValue.pair(new MessageKey(key), mapInputMessageToInternalMessage(key, value)))
.mapValues(tokenizationService::detokenize)
.mapValues(outputMapper::mapInternalMessageToOutputMessage)
.peek((key, value) -> loggingUtil.startTransactionLog(value.getInputMessage()))
.mapValues(someOtherProcessor::writeSomewhereElse)
.filter(this::filterStream)
.mapValues(this::toOutputMessageWrapper)
;
我意识到我已经离开了很多,但一般来说,我需要能够“重试”这个流程,如果它打破中间的某个地方。例如, detokenize
是外部rest api,可能会遇到暂时错误。如果是这样,我希望能够在指数退避计划上重试,希望它成功。
今天,每一家商店都有支票 mapValues
回调以查看前一步中是否发生了错误,如果是这样,它们基本上就是失败的。在这些情况下 toOutputMessageWrapper
将阻止将消息发送到输出主题。我要做的就是从头再来一遍。
目前我看到的唯一可能的方法(我远不是kafka/kakfastreamsMaven)是在kstream的每个阶段实现重试机制(在不同的阶段) mapValues
方法),而不是试图“重新开始”。
还有其他建议吗?
暂无答案!
目前还没有任何答案,快来回答吧!