比较storm bolt中的上一个元组和下一个元组

xlpyo6sf  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(364)

我有由storm topology处理的实时数据。数据可以有四种类型,比如说a,b,c,d。这些数据中的每一个都由bolt以随机顺序使用。我需要做的是比较两个相同数据类型的元组。例如,我想比较类型a元组和下一个类型a元组,或者说比较当前类型a元组和以前收到的类型a元组。有没有办法在博尔特做到这一点?或者我必须将以前的结果保存在数据库的某个地方(比如hbase或cache),并查询它以与特定类型的当前元组进行比较。
编辑
假设类型a、b、c、d的数据流来自喷口
b4 a4 c7 d2 a3 a2 b3 c6 d1 b2 c5 c4 b1 c3 c2 c1 a1----->喷口-->螺栓
现在我要比较a1和a2,a2和a3,a3和a4。类似的b1与b2,b2与b3等。

dzhpxtsq

dzhpxtsq1#

您可以在喷口中发出元组时指定数据的类型。然后您可以使用字段分组,这样每个类型a都将转到同一个线程。这样,最多可以有4个不同的线程执行螺栓代码。保证每个线程的顺序。

builder.setBolt(BOLT_NAME, new BoltClass(),4)
.fieldsGrouping(SPOUT_NAME,new Fields("type"));

storm文档中的字段分组定义:
字段分组:流按分组中指定的字段进行分区。例如,如果流按“user id”字段分组,则具有相同“user id”的元组将始终转到同一个任务,但是具有不同“user id”的元组可能转到不同的任务。
http://storm.apache.org/documentation/concepts.html

相关问题