风暴中的三叉戟状态是什么?

fwzugrvs  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(470)

我是风暴三叉戟新手。我为三叉戟之州感到头昏眼花。据我所知,trident维护每个批处理的状态(即元数据)(是否一个批处理中的所有元组都是通过在数据库中维护事务id来完全处理的),我不完全确定下面的语句做了什么

  1. TridentState urlToTweeters =
  2. topology.newStaticState(getUrlToTweetersState());

有人能解释一下当我们定义上述代码时实际发生了什么吗?

tgabmvqs

tgabmvqs1#

风暴维基上有关于三叉戟状态的好文档。你的问题的简单答案是 urlToTweeters 是可以从中查询的状态对象。我假设上面的陈述来自三叉戟教程,复制如下:

  1. TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
  2. TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
  3. topology.newDRPCStream("reach")
  4. .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
  5. /* At this point we have the tweeters for each url passed in args */
  6. .shuffle()
  7. .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
  8. .parallelismHint(200)
  9. .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
  10. .groupBy(new Fields("follower"))
  11. .aggregate(new One(), new Fields("one"))
  12. .parallelismHint(20)
  13. .aggregate(new Count(), new Fields("reach"));

在这个例子中, urlToTweeters 将存储到tweeter和drpc的urlMap reach 在下一行定义的查询(将url作为其参数)最终将产生reach。但是在途中(用注解内联标记),你会看到每个url的tweeter流,也就是说,查询的结果 urlToTweeters .

qmb5sa22

qmb5sa222#

我希望回答永远不会太迟,至少别人会觉得我的答案有用:)
所以, topology.newStaticState() 是trident对可查询数据存储的抽象。的参数 newStaticState() 应该是基于方法契约的 storm.trident.state.StateFactory . 反过来,工厂应该实施 makeState() 方法返回的示例 storm.trident.state.State . 但是,如果您计划查询您的状态,则应返回istance storm.trident.state.map.ReadOnlyMapState 相反,因为 storm.trident.state.State 没有用于查询实际数据源的方法(如果您尝试使用除 ReadOnlyMapState ).
所以,让我们试试吧!
虚拟状态实现:

  1. public static class ExampleStaticState implements ReadOnlyMapState<String> {
  2. private final Map<String, String> dataSourceStub;
  3. public ExampleStaticState() {
  4. dataSourceStub = new HashMap<>();
  5. dataSourceStub.put("tuple-00", "Trident");
  6. dataSourceStub.put("tuple-01", "definitely");
  7. dataSourceStub.put("tuple-02", "lacks");
  8. dataSourceStub.put("tuple-03", "documentation");
  9. }
  10. @Override
  11. public List<String> multiGet(List<List<Object>> keys) {
  12. System.out.println("DEBUG: MultiGet, keys is " + keys);
  13. List<String> result = new ArrayList<>();
  14. for (List<Object> inputTuple : keys) {
  15. result.add(dataSourceStub.get(inputTuple.get(0)));
  16. }
  17. return result;
  18. }
  19. @Override
  20. public void beginCommit(Long txid) {
  21. // never gets executed...
  22. System.out.println("DEBUG: Begin commit, txid=" + txid);
  23. }
  24. @Override
  25. public void commit(Long txid) {
  26. // never gets executed...
  27. System.out.println("DEBUG: Commit, txid=" + txid);
  28. }
  29. }

工厂:

  1. public static class ExampleStaticStateFactory implements StateFactory {
  2. @Override
  3. public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
  4. return new ExampleStaticState();
  5. }
  6. }

一个简单的 psvm (又名 public static void main ):

  1. public static void main(String... args) {
  2. TridentTopology tridentTopology = new TridentTopology();
  3. FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
  4. "foo"
  5. }));
  6. TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
  7. tridentTopology
  8. .newStream("spout", spout)
  9. .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
  10. .each(new Fields("foo", "bar"), new Debug())
  11. ;
  12. Config conf = new Config();
  13. conf.setNumWorkers(6);
  14. LocalCluster localCluster = new LocalCluster();
  15. localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
  16. spout.feed(Arrays.asList(new Values[]{
  17. new Values("tuple-00"),
  18. new Values("tuple-01"),
  19. new Values("tuple-02"),
  20. new Values("tuple-03")
  21. }));
  22. localCluster.shutdown();
  23. }

最后,输出:

  1. DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
  2. DEBUG: [tuple-00, Trident]
  3. DEBUG: [tuple-01, definitely]
  4. DEBUG: [tuple-02, lacks]
  5. DEBUG: [tuple-03, documentation]

您可以看到,statequery()从输入批处理中获取值,并将它们Map到“数据存储”中找到的值。
潜水再深一点,你就可以看看 MapGet 类(其示例用于在拓扑内部查询的对象)并在其中查找以下内容:

  1. public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
  2. @Override
  3. public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
  4. return map.multiGet((List) keys);
  5. }
  6. @Override
  7. public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
  8. collector.emit(new Values(result));
  9. }
  10. }

所以在引擎盖下它只会叫 multiGet() 你的方法 ReadOnlyMapState 实现,然后发出在数据存储中找到的值,将它们添加到已经存在的元组中。您可以(尽管这可能不是最好的做法)创建自己的实现 BaseQueryFunction<ReadOnlyMapState, Object> 做更复杂的事。

展开查看全部

相关问题