builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));
我在字段分组中添加了一个输入字段“field1”。根据字段分组的定义,所有具有相同“field1”的tweet都应该转到tweetcounter的单个任务。计数器螺栓的执行器设置为2。
然而,如果传入流的所有元组中的“field1”都相同,这是否意味着即使我为tweetcounter指定了2个执行器,流也只会被发送到其中一个,而另一个示例仍然是空的?
为了进一步了解我的特定用例,如何使用一个喷口并根据输入字段(field1)的特定值将数据发送到不同的螺栓?
2条答案
按热度按时间kr98yfug1#
解决这个问题的一种方法似乎是使用直接分组,其中源决定哪个组件将接收元组:
这是一种特殊的分组。以这种方式分组的流意味着元组的生产者决定使用者的哪个任务将接收这个元组。直接分组只能在已声明为直接流的流上声明。向直接流发出的元组必须使用[emitdirect](javadocs/org/apache/storm/task/outputcollector.html#emitdirect(int,int,java.util.list)方法之一发出。bolt可以通过使用提供的topologycontext或跟踪outputcollector中emit方法的输出(它返回元组发送到的任务id)来获取其使用者的任务id。
您可以在这里看到它的示例用法:
哪里
getWordCountIndex
返回此元组所在组件的索引。5lhxktic2#
使用的替代方法
emitDirect
如本文所述,答案是实现您自己的流分组。复杂性大致相同,但它允许您跨多个螺栓重用分组逻辑。例如,storm中的shuffle分组实现为
CustomStreamGrouping
具体如下:Storm要来了
prepare
告诉您分组负责的任务ID,以及拓扑上的一些上下文。当storm从使用此分组的bolt/spout发出元组时,storm将调用chooseTasks
它允许您定义元组应该转到哪些任务。然后在构建拓扑时使用分组,如图所示:注意分组需要
Serializable
线程安全。