我是风暴三叉戟新手。我为三叉戟之州感到头昏眼花。据我所知,trident维护每个批处理的状态(即元数据)(是否一个批处理中的所有元组都是通过在数据库中维护事务id来完全处理的),我不完全确定下面的语句做了什么
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
有人能解释一下当我们定义上述代码时实际发生了什么吗?
我是风暴三叉戟新手。我为三叉戟之州感到头昏眼花。据我所知,trident维护每个批处理的状态(即元数据)(是否一个批处理中的所有元组都是通过在数据库中维护事务id来完全处理的),我不完全确定下面的语句做了什么
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
有人能解释一下当我们定义上述代码时实际发生了什么吗?
2条答案
按热度按时间tgabmvqs1#
风暴维基上有关于三叉戟状态的好文档。你的问题的简单答案是
urlToTweeters
是可以从中查询的状态对象。我假设上面的陈述来自三叉戟教程,复制如下:在这个例子中,
urlToTweeters
将存储到tweeter和drpc的urlMapreach
在下一行定义的查询(将url作为其参数)最终将产生reach。但是在途中(用注解内联标记),你会看到每个url的tweeter流,也就是说,查询的结果urlToTweeters
.qmb5sa222#
我希望回答永远不会太迟,至少别人会觉得我的答案有用:)
所以,
topology.newStaticState()
是trident对可查询数据存储的抽象。的参数newStaticState()
应该是基于方法契约的storm.trident.state.StateFactory
. 反过来,工厂应该实施makeState()
方法返回的示例storm.trident.state.State
. 但是,如果您计划查询您的状态,则应返回istancestorm.trident.state.map.ReadOnlyMapState
相反,因为storm.trident.state.State
没有用于查询实际数据源的方法(如果您尝试使用除ReadOnlyMapState
).所以,让我们试试吧!
虚拟状态实现:
工厂:
一个简单的
psvm
(又名public static void main
):最后,输出:
您可以看到,statequery()从输入批处理中获取值,并将它们Map到“数据存储”中找到的值。
潜水再深一点,你就可以看看
MapGet
类(其示例用于在拓扑内部查询的对象)并在其中查找以下内容:所以在引擎盖下它只会叫
multiGet()
你的方法ReadOnlyMapState
实现,然后发出在数据存储中找到的值,将它们添加到已经存在的元组中。您可以(尽管这可能不是最好的做法)创建自己的实现BaseQueryFunction<ReadOnlyMapState, Object>
做更复杂的事。