用apachestorm中的字段分组将输入从单个喷口发送到多个螺栓

plupiseo  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(359)
builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));

我在字段分组中添加了一个输入字段“field1”。根据字段分组的定义,所有具有相同“field1”的tweet都应该转到tweetcounter的单个任务。计数器螺栓的执行器设置为2。
然而,如果传入流的所有元组中的“field1”都相同,这是否意味着即使我为tweetcounter指定了2个执行器,流也只会被发送到其中一个,而另一个示例仍然是空的?
为了进一步了解我的特定用例,如何使用一个喷口并根据输入字段(field1)的特定值将数据发送到不同的螺栓?

kr98yfug

kr98yfug1#

解决这个问题的一种方法似乎是使用直接分组,其中源决定哪个组件将接收元组:
这是一种特殊的分组。以这种方式分组的流意味着元组的生产者决定使用者的哪个任务将接收这个元组。直接分组只能在已声明为直接流的流上声明。向直接流发出的元组必须使用[emitdirect](javadocs/org/apache/storm/task/outputcollector.html#emitdirect(int,int,java.util.list)方法之一发出。bolt可以通过使用提供的topologycontext或跟踪outputcollector中emit方法的输出(它返回元组发送到的任务id)来获取其使用者的任务id。
您可以在这里看到它的示例用法:

collector.emitDirect(getWordCountIndex(word),new Values(word));

哪里 getWordCountIndex 返回此元组所在组件的索引。

5lhxktic

5lhxktic2#

使用的替代方法 emitDirect 如本文所述,答案是实现您自己的流分组。复杂性大致相同,但它允许您跨多个螺栓重用分组逻辑。
例如,storm中的shuffle分组实现为 CustomStreamGrouping 具体如下:

public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
    private ArrayList<List<Integer>> choices;
    private AtomicInteger current;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        choices = new ArrayList<List<Integer>>(targetTasks.size());
        for (Integer i : targetTasks) {
            choices.add(Arrays.asList(i));
        }
        current = new AtomicInteger(0);
        Collections.shuffle(choices, new Random());
    }

    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        int rightNow;
        int size = choices.size();
        while (true) {
            rightNow = current.incrementAndGet();
            if (rightNow < size) {
                return choices.get(rightNow);
            } else if (rightNow == size) {
                current.set(0);
                return choices.get(0);
            }
        } // race condition with another thread, and we lost. try again
    }
}

Storm要来了 prepare 告诉您分组负责的任务ID,以及拓扑上的一些上下文。当storm从使用此分组的bolt/spout发出元组时,storm将调用 chooseTasks 它允许您定义元组应该转到哪些任务。然后在构建拓扑时使用分组,如图所示:

TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("spout", new MySpout(), 1);
tp.setBolt("bolt", new MyBolt())
  .customGrouping("spout", new ShuffleGrouping());

注意分组需要 Serializable 线程安全。

相关问题