添加更多节点时,storm如何处理字段分组?

prdp8dxp  于 2021-06-24  发布在  Storm
关注(0)|答案(4)|浏览(317)

在阅读更多关于storm的细节时,我们发现了它的字段分组功能,例如,如果你对每个用户的tweet进行计数,并且有两个任务的字段分组为用户id,那么相同的用户id将被发送到相同的任务。
因此,任务1的内存中可以有以下计数bob:10 alice:5
任务2的内存中可能有以下计数jill:10 joe: 4
如果我向集群添加了一台新机器以增加容量并重新平衡运行,那么内存中的计数会发生什么变化?你会开始得到不同数量的用户吗?

sauutmhj

sauutmhj1#

使用字段分组,我们可以引导特定字段转到特定任务。
字段分组:流按分组中指定的字段进行分区。例如,如果流按“user id”字段分组,则具有相同“user id”的元组将始终转到同一个任务,但是具有不同“user id”的元组可能转到不同的任务。
这些任务在风暴的生命周期中总是静态的,您可以使用 rebalance 是执行器(线程)的数目。在向集群添加新节点的情况下,允许您重新配置要运行的执行器的数量,而无需关闭拓扑,但无论任务的数量如何,都保持不变。只是添加一个新节点可以通过调整storm的并行性来提高性能。

bpzcxfmw

bpzcxfmw2#

要完全理解它,您必须查看代码:
字段分组取决于字段字符串,而不是由哪个喷口发出。所以重新平衡不会影响它。这是函数:https://github.com/apache/storm/blob/3b1ab3d8a7da7ed35adc448d24f1f1ccb6c5ff27/storm-core/src/jvm/org/apache/storm/daemon/grouperfactory.java#l157-l161型

@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
    int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
    return Collections.singletonList(targetTasks.get(targetTaskIndex));
}

tupleutils.listhashcode导致

public static <T> int listHashCode(List<T> alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());
  }
}
ne5o7dgx

ne5o7dgx3#

基于一个或多个元组的字段,字段分组允许您控制发送到螺栓的元组。它确保字段组合的给定值集始终发送到同一个螺栓。

kxxlusnw

kxxlusnw4#

为了在每次向同一个任务发送消息,storm将用任务数(hashcode(values)%#tasks)修改值的hashcode。如果您要增加您的任务,您的计数将不准确,因为在重新平衡后,它们可能不会转到同一个任务/工作者。

https://groups.google.com/forum/#!msg/storm-user/lCKnl8AmSVE/rVCH3uuUAcMJ

相关问题