(还有几个关于超时和maxspoutpending的问题)
我在storm文档中看到很多关于消息被完全处理的参考资料。但是我的kafkaspout如何知道消息何时被完全处理?
希望它知道我的螺栓连接的方式,所以当我的流中的最后一个螺栓确认一个元组时,喷口知道我的消息何时被处理?
否则,我会设想在超时期限到期后,检查消息的ack-ed状态,如果由acking/anchoring xor指示,则认为它已被处理。但我希望不是这样?
我还有关于maxtuplespending和超时配置的相关问题。
如果我将maxtuplepending设置为10k,那么我是否正确地认为每个喷口示例将继续发射元组,直到该喷口示例跟踪飞行中的10k元组,即尚未完全处理的10k元组?当当前正在传输的消息被完全处理时,会发出一个新的元组吗?
最后,这与超时配置有关吗?喷口是否以任何方式等待配置的超时发生,然后再发出新消息?或者只有当消息在处理过程中暂停/缓慢,导致消息因超时而失败时,超时配置才会起作用?
更简洁地说(或者更清楚地说),除了消息在30分钟内被最后一个插销确认不会失败之外,我的超时设置为30分钟是否有其他效果?或者是否有其他影响,例如超时配置影响喷口的排放率?
抱歉问了这么长的问题。提前感谢您的回复。
- 编辑以进一步澄清
我之所以担心这一点,是因为我的消息不一定贯穿整个流。
假设我有螺栓a,b,c,d。大多数情况下,消息将从a->b->->d传递。但我有一些信息,有意将停止对螺栓a。a将确认它们,但不会发出它们(由于我的业务逻辑,在这些情况下,我确实希望进一步处理消息)。
那么,我的kafkaspout是否会知道,已确认但未从a发出的消息已被完全处理?因为我想另一个消息,以发射从喷口一旦螺栓a完成,在这种情况下。
1条答案
按热度按时间ego6inou1#
storm通过udf代码必须使用的锚定机制跟踪整个拓扑中的元组。这种锚定导致了所谓的元组树,树的根是喷口发出的元组,所有其他节点(在树结构中连接)表示从使用输入元组作为锚定的螺栓发出的元组(这只是一个逻辑模型,在storm中没有以这种方式实现)。
例如,一个喷口发出一个句子元组,该元组由单词中的第一个词组分割,一些单词由第二个词组过滤,单词计数由第三个词组应用。最后,sink bolt将结果写入文件。这棵树看起来像这样:
最初的句子由spout发出,由bolt1用作所有发出的标记的锚点,并由bolt1确认。bolt2过滤掉“this”、“is”和“an”,只确认三个元组示例”和“句子”只是转发,用作输出元组的锚点,然后确认。同样的情况也发生在bolt2中,最后一个sink bolt只是确认所有传入的元组。
此外,storm跟踪所有元组的所有ack,即来自中间螺栓和sink螺栓的ack。首先,喷口将输出元组的id发送给acker任务。每次将一个元组用作锚定时,acker还会得到一条消息,其中包含锚定元组id和输出元组id(由storm自动生成)。来自bolt的acke也会转到与xor相同的acker任务。如果接收到所有ack(即,对于spout和所有递归锚定的输出元组),(xor结果将为零),acker将向spout发送一条消息,说明元组已被完全处理,并对其进行回呼
Spout.ack(MessageId)
发生(即,回叫在元组完全处理后立即完成)。此外,如果有一个元组被acker注册的时间超过了超时时间,acker会定期检查。如果发生这种情况,则acker会删除元组id,并向spout发送一条消息,指出元组失败(导致调用)Spout.fail(MessageId)
).此外,喷口会记录飞行中的所有元组并停止呼叫
Spout.nextTuple()
如果此计数超过maxTuplesPending
参数。据我所知,参数是全局应用的,也就是说,每个喷口任务的局部计数相加,并将全局计数与参数进行比较(但不确定具体是如何实现的)。所以
timeout
参数独立于maxTuplesPending
.