我目前正在storm中度量一种新的分组方法,因此吞吐量和延迟是最重要的,但是在度量每个元组的端到端延迟时遇到了一些困难。
我试图在元组中添加时间戳,并计算在拓扑下游接收到它时的延迟,但结果中存在负数。
因为我在集群模式下运行拓扑,所以集群中的机器之间无法精确地同步时间(我尝试过) NTP
但也不够精确),这可能是问题的原因。
那么storm本身是否提供了某种方法来测量每个元组的端到端延迟呢?或者我能用什么方法来达到目的?
我目前正在storm中度量一种新的分组方法,因此吞吐量和延迟是最重要的,但是在度量每个元组的端到端延迟时遇到了一些困难。
我试图在元组中添加时间戳,并计算在拓扑下游接收到它时的延迟,但结果中存在负数。
因为我在集群模式下运行拓扑,所以集群中的机器之间无法精确地同步时间(我尝试过) NTP
但也不够精确),这可能是问题的原因。
那么storm本身是否提供了某种方法来测量每个元组的端到端延迟呢?或者我能用什么方法来达到目的?
1条答案
按热度按时间b1payxdu1#
终于有办法了!
我们实现了我们自己的spout和irichspout接口
nextTuple()
方法我们为元组分配了一个唯一的id,并将时间戳放入ConcurrentHashMap
,然后通过执行likecollector.emit(new Values(str), ID);
使storm知道元组被分配了一个id,因此在我们的ack(Object msgId)
方法,我们通过给出键来检查hashmap中的时间msgId
,与当前时间比较,我们得到了这个元组的延迟!当我们设置喷口平行度为1时,我们就不必再担心时间同步问题了。