我一直在看Kafka的源代码 Log
在Kafka项目的核心模块类,但我仍然是新的scala。我遇到了一个很难理解的语法。以下是代码片段:
代码段1:
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()
}
代码段2:
private[log] def retryOnOffsetOverflow[T](fn: => T): T = {
while (true) {
try {
return fn// what is fn here in context of code snippet 1?
} catch {
case e: LogSegmentOffsetOverflowException =>
info(s"Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
splitOverflowedSegment(e.segment)//##!!!1.return a List[Segement], but where does this return goes?
}
}
throw new IllegalStateException()
}
我发现很难理解的是这个方法 retryOnOffsetOverflow
是在代码段1中调用的,传递给它的参数是什么?我知道你的情人 retryOnOffsetOverflow
是一个函数,但在这个代码段中,传递给这个函数的参数是什么?
我也不清楚什么是回报 retryOnOffsetOverflow
在这里?回报是 T
哪种是通用的?我不知道你的回报是什么 retryOnOffsetOverflow
在这里,它是否会根据捕获异常的事实而有所不同?如果是的话,具体回报是多少?
非常感谢您的解释,如果我错过了回答问题所需的代码,请告诉我。
更新:我会纠正我自己的param retryOnOffsetOverflow
是一个按名称参数,除非在方法体的某个地方引用了它,否则不会对其求值。
3条答案
按热度按时间mtb9vblg1#
hm2xizp92#
我发现很难理解的是,在代码段1中如何调用retryonoffsetoverflow方法,以及将什么作为其param的参数传递给它?我知道retryonoffsetoverflow的param是一个函数,但是在这个片段中,传递给这个函数的参数是什么?
考虑下面的简化示例
qij5mzcb3#
upd.:略微更改了最后一部分,因为看起来它将在下一个循环迭代中加载“拆分”的文件。
参数
retryOnOffsetOverflow
下面是花括号中的所有内容-基本上,这三行代码-这就是函数(fn: => T)
,它接受。retryOnOffsetOverflow
正在尝试执行传递的函数,如果执行成功,则返回其答案。有一部分有点难以理解-当有例外的时候,splitOverflowedSegment
不是因为它的返回类型,而是因为它在replaceSegments
功能。在下一次循环迭代重新启动时,将读取这些段loadSegmentFiles
功能。